Skip to content

Commit

Permalink
fix(status) do not block config updates on failure
Browse files Browse the repository at this point in the history
The initial status update loop setup function,
ctrlutils.PullConfigUpdate, sets up a channel select that receives
ConfigDone events. The config update loop sends to this channel, and
blocks if nothing can receive from it.

Previously, a failure to build the client configuration necessary to
update status would exit PullConfigUpdate. Failure to retrieve the
publish service address(es) would block PullConfigUpdate before it began
receiving events, and environments that never receive addresses
(LoadBalancer Services in clusters that cannot provision LoadBalancers)
would block indefinitely. If either of these occurred, The config update
loop would run once, block, and never run again.

This change avoids this deadlock by having PullConfigUpdate always begin
its channel receive. The ConfigDone receiver attempts to initialize
configuration and status information if they are not marked ready, and
marks them ready if it succeeds. If both are ready, it updates status.
If one or the other is not ready, it logs a debug-level message and does
nothing.
  • Loading branch information
Travis Raines committed Nov 19, 2021
1 parent dd683ed commit 53682e2
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 69 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ under the `resources` block for the `configuration.konghq.com` API group.
[#2012](https://github.com/Kong/kubernetes-ingress-controller/issues/2012)
- The template admission webhook configuration now includes KongClusterPlugins.
[#2000](https://github.com/Kong/kubernetes-ingress-controller/issues/2000)
- Failures to set up the status update subsystem no longer block the update
loop.
[#2005](https://github.com/Kong/kubernetes-ingress-controller/issues/2005)

#### Under the hood

Expand Down
167 changes: 98 additions & 69 deletions internal/ctrlutils/ingress-status.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ctrlutils

import (
"context"
"errors"
"fmt"
"net"
"sort"
Expand All @@ -13,8 +14,9 @@ import (
"github.com/go-logr/logr"
"github.com/kong/deck/file"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/version"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
knative "knative.dev/networking/pkg/apis/networking/v1alpha1"
Expand All @@ -32,71 +34,100 @@ const (
statusUpdateWaitTick = time.Second
)

// PullConfigUpdate is a dedicated function that process ingress/customer resource status update after configuration is updated within kong.
func PullConfigUpdate(
ctx context.Context,
kongConfig sendconfig.Kong,
log logr.Logger,
kubeConfig *rest.Config,
publishService string,
publishAddresses []string,
) {
var hostname string
var ips []string
for {
var err error
ips, hostname, err = RunningAddresses(ctx, kubeConfig, publishService, publishAddresses)
if err != nil {
// in the case that a LoadBalancer type service is being used for the Kong Proxy
// we must wait until the IP address if fully provisioned before we will be able
// to use the reference IPs/Hosts from that LoadBalancer to update resource status addresses.
if err.Error() == proxyLBNotReadyMessage {
log.V(util.InfoLevel).Info("LoadBalancer type Service for the Kong proxy is not yet provisioned, waiting...", "service", publishService)
time.Sleep(time.Second)
continue
}
var (
errLBNotReady = errors.New("LoadBalancer service has no IPs assigned")
)

log.Error(err, "failed to determine kong proxy external ips/hostnames.")
return
}
break
}
type statusConfig struct {
ready bool
cli *clientset.Clientset
versionInfo *version.Info
kubernetesVersion semver.Version
kiccli *kicclientset.Clientset
}

type statusInfo struct {
ready bool
ips []string
hostname string
}

func newStatusConfig(kubeConfig *rest.Config) (statusConfig, error) {
cli, err := clientset.NewForConfig(kubeConfig)
if err != nil {
log.Error(err, "failed to create k8s client.")
return
return statusConfig{ready: false}, err
}

versionInfo, err := cli.ServerVersion()
if err != nil {
log.Error(err, "failed to retrieve cluster version")
return
return statusConfig{ready: false}, err
}

kubernetesVersion, err := semver.Parse(strings.TrimPrefix(versionInfo.String(), "v"))
if err != nil {
log.Error(err, "could not parse cluster version")
return
return statusConfig{ready: false}, err
}

kiccli, err := kicclientset.NewForConfig(kubeConfig)
if err != nil {
log.Error(err, "failed to create kong ingress client.")
return
return statusConfig{ready: false}, err
}
return statusConfig{
ready: true,
cli: cli,
versionInfo: versionInfo,
kubernetesVersion: kubernetesVersion,
kiccli: kiccli,
}, nil
}

// PullConfigUpdate is a dedicated function that process ingress/customer resource status update after configuration is updated within kong.
func PullConfigUpdate(
ctx context.Context,
kongConfig sendconfig.Kong,
log logr.Logger,
kubeConfig *rest.Config,
publishService string,
publishAddresses []string,
) {
cfg := statusConfig{ready: false}
status := statusInfo{ready: false}
var wg sync.WaitGroup
var err error
for {
select {
case updateDone := <-kongConfig.ConfigDone:
log.V(util.DebugLevel).Info("data-plane updates completed, updating resource statuses")
wg.Add(1)
go func() {
if err := UpdateStatuses(ctx, &updateDone, log, cli, kiccli, &wg, ips, hostname, kubeConfig, kubernetesVersion); err != nil {
log.Error(err, "failed to update resource statuses")
if !cfg.ready {
cfg, err = newStatusConfig(kubeConfig)
if err != nil {
log.Error(err, "failed to initialize status updater")
}
}()
}
if !status.ready {
status, err = runningAddresses(ctx, kubeConfig, publishService, publishAddresses)
if err != nil {
if errors.Is(err, errLBNotReady) {
// Separate this into a debug log since it's expected in environments that cannot provision
// LoadBalancers (which we request by default), and will spam logs otherwise.
log.V(util.DebugLevel).Info("LoadBalancer type Service for the Kong proxy has no IPs", "service", publishService)
} else {
log.Error(err, "failed to look up status info for Kong proxy Service", "service", publishService)
}
}
}

if cfg.ready && status.ready {
log.V(util.DebugLevel).Info("data-plane updates completed, updating resource statuses")
wg.Add(1)
go func() {
if err := UpdateStatuses(ctx, &updateDone, log, cfg.cli, cfg.kiccli, &wg, status.ips,
status.hostname, kubeConfig, cfg.kubernetesVersion); err != nil {
log.Error(err, "failed to update resource statuses")
}
}()
} else {
log.V(util.DebugLevel).Info("config or publish service information unavailable, skipping status update")
}
case <-ctx.Done():
log.Info("stop status update channel.")
wg.Wait()
Expand Down Expand Up @@ -204,7 +235,7 @@ func UpdateIngress(
for retry < statusUpdateRetry {
curIng, err := ingCli.Get(ctx, name, metav1.GetOptions{})
if err != nil || curIng == nil {
if errors.IsNotFound(err) {
if apiErrors.IsNotFound(err) {
log.V(util.DebugLevel).Info("failed to retrieve v1/Ingress: the object is gone, quitting status updates", "namespace", namespace, "name", name)
return nil
}
Expand All @@ -231,11 +262,11 @@ func UpdateIngress(
if err == nil {
break
}
if errors.IsNotFound(err) {
if apiErrors.IsNotFound(err) {
log.V(util.DebugLevel).Info("failed to update the status for v1/Ingress object because it is gone, status update stopped", "namespace", namespace, "name", name)
return nil
}
if errors.IsConflict(err) {
if apiErrors.IsConflict(err) {
log.V(util.DebugLevel).Info("failed to update the status for v1/Ingress object because the object has changed: retrying...", "namespace", namespace, "name", name)
} else {
log.V(util.DebugLevel).Info("failed to update the status for v1/Ingress object due to an unexpected error, retrying...", "namespace", namespace, "name", name)
Expand Down Expand Up @@ -270,7 +301,7 @@ func UpdateIngressLegacy(
for retry < statusUpdateRetry {
curIng, err := ingCli.Get(ctx, name, metav1.GetOptions{})
if err != nil || curIng == nil {
if errors.IsNotFound(err) {
if apiErrors.IsNotFound(err) {
log.V(util.DebugLevel).Info("failed to retrieve v1beta1/Ingress: the object is gone, quitting status updates", "namespace", namespace, "name", name)
return nil
}
Expand All @@ -297,11 +328,11 @@ func UpdateIngressLegacy(
if err == nil {
break
}
if errors.IsNotFound(err) {
if apiErrors.IsNotFound(err) {
log.V(util.DebugLevel).Info("failed to update the status for v1beta1/Ingress object because it is gone, status update stopped", "namespace", namespace, "name", name)
return nil
}
if errors.IsConflict(err) {
if apiErrors.IsConflict(err) {
log.V(util.DebugLevel).Info("failed to update the status for v1beta1/Ingress object because the object has changed: retrying...", "namespace", namespace, "name", name)
} else {
log.V(util.DebugLevel).Info("failed to update the status for v1beta1/Ingress object due to an unexpected error, retrying...", "namespace", namespace, "name", name)
Expand Down Expand Up @@ -329,7 +360,7 @@ func UpdateUDPIngress(ctx context.Context, log logr.Logger, svc file.FService, k
for retry < statusUpdateRetry {
curIng, err := ingCli.Get(ctx, name, metav1.GetOptions{})
if err != nil || curIng == nil {
if errors.IsNotFound(err) {
if apiErrors.IsNotFound(err) {
log.V(util.DebugLevel).Info("failed to retrieve v1beta1/UDPIngress: the object is gone, quitting status updates", "namespace", namespace, "name", name)
return nil
}
Expand All @@ -356,11 +387,11 @@ func UpdateUDPIngress(ctx context.Context, log logr.Logger, svc file.FService, k
if err == nil {
break
}
if errors.IsNotFound(err) {
if apiErrors.IsNotFound(err) {
log.V(util.DebugLevel).Info("failed to update the status for v1beta1/UDPIngress object because it is gone, status update stopped", "namespace", namespace, "name", name)
return nil
}
if errors.IsConflict(err) {
if apiErrors.IsConflict(err) {
log.V(util.DebugLevel).Info("failed to update the status for v1beta1/UDPIngress object because the object has changed: retrying...", "namespace", namespace, "name", name)
} else {
log.V(util.DebugLevel).Info("failed to update the status for v1beta1/UDPIngress object due to an unexpected error, retrying...", "namespace", namespace, "name", name)
Expand Down Expand Up @@ -390,7 +421,7 @@ func UpdateTCPIngress(ctx context.Context, log logr.Logger, svc file.FService, k
for retry < statusUpdateRetry {
curIng, err := ingCli.Get(ctx, name, metav1.GetOptions{})
if err != nil || curIng == nil {
if errors.IsNotFound(err) {
if apiErrors.IsNotFound(err) {
log.V(util.DebugLevel).Info("failed to retrieve v1beta1/TCPIngress: the object is gone, quitting status updates", "namespace", namespace, "name", name)
return nil
}
Expand All @@ -413,11 +444,11 @@ func UpdateTCPIngress(ctx context.Context, log logr.Logger, svc file.FService, k
if err == nil {
break
}
if errors.IsNotFound(err) {
if apiErrors.IsNotFound(err) {
log.V(util.DebugLevel).Info("failed to update the status for v1beta1/TCPIngress object because it is gone, status update stopped", "namespace", namespace, "name", name)
return nil
}
if errors.IsConflict(err) {
if apiErrors.IsConflict(err) {
log.V(util.DebugLevel).Info("failed to update the status for v1beta1/TCPIngress object because the object has changed: retrying...", "namespace", namespace, "name", name)
} else {
log.V(util.DebugLevel).Info("failed to update the status for v1beta1/TCPIngress object due to an unexpected error, retrying...", "namespace", namespace, "name", name)
Expand Down Expand Up @@ -453,7 +484,7 @@ func UpdateKnativeIngress(ctx context.Context, log logr.Logger, svc file.FServic
for retry < statusUpdateRetry {
curIng, err := ingClient.Get(ctx, name, metav1.GetOptions{})
if err != nil || curIng == nil {
if errors.IsNotFound(err) {
if apiErrors.IsNotFound(err) {
log.V(util.DebugLevel).Info("failed to retrieve knative/Ingress: the object is gone, quitting status updates", "namespace", namespace, "name", name)
return nil
}
Expand Down Expand Up @@ -491,11 +522,11 @@ func UpdateKnativeIngress(ctx context.Context, log logr.Logger, svc file.FServic
if err == nil {
break
}
if errors.IsNotFound(err) {
if apiErrors.IsNotFound(err) {
log.V(util.DebugLevel).Info("failed to update the status for knative/Ingress object because it is gone, status update stopped", "namespace", namespace, "name", name)
return nil
}
if errors.IsConflict(err) {
if apiErrors.IsConflict(err) {
log.V(util.DebugLevel).Info("failed to update the status for knative/Ingress object because the object has changed: retrying...", "namespace", namespace, "name", name)
} else {
log.V(util.DebugLevel).Info("failed to update the status for knative/Ingress object due to an unexpected error, retrying...", "namespace", namespace, "name", name)
Expand All @@ -510,25 +541,23 @@ func UpdateKnativeIngress(ctx context.Context, log logr.Logger, svc file.FServic
return nil
}

const proxyLBNotReadyMessage = "LoadBalancer service for proxy is not yet ready"

// RunningAddresses retrieve cluster loader balance IP or hostaddress using networking
func RunningAddresses(ctx context.Context, kubeCfg *rest.Config, publishService string,
publishAddresses []string) ([]string, string, error) {
// runningAddresses retrieve cluster loader balance IP or hostaddress using networking
func runningAddresses(ctx context.Context, kubeCfg *rest.Config, publishService string,
publishAddresses []string) (statusInfo, error) {
addrs := []string{}
if len(publishAddresses) > 0 {
addrs = append(addrs, publishAddresses...)
return addrs, "", nil
return statusInfo{ready: true, ips: addrs, hostname: ""}, nil
}
namespace, name, err := util.ParseNameNS(publishService)
if err != nil {
return nil, "", fmt.Errorf("unable to retrieve service for status: %w", err)
return statusInfo{ready: false, ips: addrs, hostname: ""}, fmt.Errorf("unable to retrieve service for status: %w", err)
}

CoreClient, _ := clientset.NewForConfig(kubeCfg)
svc, err := CoreClient.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return nil, "", err
return statusInfo{ready: false, ips: addrs, hostname: ""}, err
}

clusterDomain := network.GetClusterDomainName()
Expand All @@ -538,7 +567,7 @@ func RunningAddresses(ctx context.Context, kubeCfg *rest.Config, publishService
switch svc.Spec.Type {
case apiv1.ServiceTypeLoadBalancer:
if len(svc.Status.LoadBalancer.Ingress) < 1 {
return addrs, hostname, fmt.Errorf(proxyLBNotReadyMessage)
return statusInfo{ready: false, ips: addrs, hostname: ""}, errLBNotReady
}

for _, ip := range svc.Status.LoadBalancer.Ingress {
Expand All @@ -550,9 +579,9 @@ func RunningAddresses(ctx context.Context, kubeCfg *rest.Config, publishService
}

addrs = append(addrs, svc.Spec.ExternalIPs...)
return addrs, hostname, nil
return statusInfo{ready: true, ips: addrs, hostname: hostname}, nil
default:
return addrs, hostname, nil
return statusInfo{ready: true, ips: addrs, hostname: hostname}, nil
}
}

Expand Down

0 comments on commit 53682e2

Please sign in to comment.