Skip to content

Commit

Permalink
fix(status) do not block config updates on failure
Browse files Browse the repository at this point in the history
The initial status update loop setup function,
ctrlutils.PullConfigUpdate, sets up a channel select that receives
ConfigDone events. The config update loop sends to this channel, and
blocks if nothing can receive from it.

Previously, a failure to build the client configuration necessary to
update status would exit PullConfigUpdate. Failure to retrieve the
publish service address(es) would block PullConfigUpdate before it began
receiving events, and environments that never receive addresses
(LoadBalancer Services in clusters that cannot provision LoadBalancers)
would block indefinitely. If either of these occurred, The config update
loop would run once, block, and never run again.

This change avoids this deadlock by having PullConfigUpdate always begin
its channel receive. The ConfigDone receiver attempts to initialize
configuration and status information if they are not marked ready, and
marks them ready if it succeeds. If both are ready, it updates status.
If one or the other is not ready, it logs a debug-level message and does
nothing.
  • Loading branch information
Travis Raines committed Nov 9, 2021
1 parent 7939ad9 commit 458151d
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 51 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ under the `resources` block for the `configuration.konghq.com` API group.

- The template admission webhook configuration now includes KongClusterPlugins.
[#2000](https://github.com/Kong/kubernetes-ingress-controller/issues/2000)
- Failures to set up the status update subsystem no longer block the update
loop.
[#2005](https://github.com/Kong/kubernetes-ingress-controller/issues/2005)

## [2.0.5]

Expand Down
128 changes: 77 additions & 51 deletions internal/ctrlutils/ingress-status.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/version"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
knative "knative.dev/networking/pkg/apis/networking/v1alpha1"
Expand All @@ -32,71 +33,96 @@ const (
statusUpdateWaitTick = time.Second
)

// PullConfigUpdate is a dedicated function that process ingress/customer resource status update after configuration is updated within kong.
func PullConfigUpdate(
ctx context.Context,
kongConfig sendconfig.Kong,
log logr.Logger,
kubeConfig *rest.Config,
publishService string,
publishAddresses []string,
) {
var hostname string
var ips []string
for {
var err error
ips, hostname, err = RunningAddresses(ctx, kubeConfig, publishService, publishAddresses)
if err != nil {
// in the case that a LoadBalancer type service is being used for the Kong Proxy
// we must wait until the IP address if fully provisioned before we will be able
// to use the reference IPs/Hosts from that LoadBalancer to update resource status addresses.
if err.Error() == proxyLBNotReadyMessage {
log.V(util.InfoLevel).Info("LoadBalancer type Service for the Kong proxy is not yet provisioned, waiting...", "service", publishService)
time.Sleep(time.Second)
continue
}
type statusConfig struct {
ready bool
cli *clientset.Clientset
versionInfo *version.Info
kubernetesVersion semver.Version
kiccli *kicclientset.Clientset
}

log.Error(err, "failed to determine kong proxy external ips/hostnames.")
return
}
break
}
type statusInfo struct {
ready bool
ips []string
hostname string
}

func newStatusConfig(kubeConfig *rest.Config) (statusConfig, error) {
cli, err := clientset.NewForConfig(kubeConfig)
if err != nil {
log.Error(err, "failed to create k8s client.")
return
return statusConfig{ready: false}, err
}

versionInfo, err := cli.ServerVersion()
if err != nil {
log.Error(err, "failed to retrieve cluster version")
return
return statusConfig{ready: false}, err
}

kubernetesVersion, err := semver.Parse(strings.TrimPrefix(versionInfo.String(), "v"))
if err != nil {
log.Error(err, "could not parse cluster version")
return
return statusConfig{ready: false}, err
}

kiccli, err := kicclientset.NewForConfig(kubeConfig)
if err != nil {
log.Error(err, "failed to create kong ingress client.")
return
return statusConfig{ready: false}, err
}
return statusConfig{
ready: true,
cli: cli,
versionInfo: versionInfo,
kubernetesVersion: kubernetesVersion,
kiccli: kiccli,
}, nil
}

// PullConfigUpdate is a dedicated function that process ingress/customer resource status update after configuration is updated within kong.
func PullConfigUpdate(
ctx context.Context,
kongConfig sendconfig.Kong,
log logr.Logger,
kubeConfig *rest.Config,
publishService string,
publishAddresses []string,
) {
cfg := statusConfig{ready: false}
status := statusInfo{ready: false}
var wg sync.WaitGroup
var err error
for {
select {
case updateDone := <-kongConfig.ConfigDone:
log.V(util.DebugLevel).Info("data-plane updates completed, updating resource statuses")
wg.Add(1)
go func() {
if err := UpdateStatuses(ctx, &updateDone, log, cli, kiccli, &wg, ips, hostname, kubeConfig, kubernetesVersion); err != nil {
log.Error(err, "failed to update resource statuses")
if !cfg.ready {
cfg, err = newStatusConfig(kubeConfig)
if err != nil {
log.Error(err, "failed to initialize status updater")
}
}()
}
if !status.ready {
status, err = runningAddresses(ctx, kubeConfig, publishService, publishAddresses)
if err != nil {
if err.Error() == proxyLBNotReadyMessage {
// Separate this into a debug log since it's expected in environments that cannot provision
// LoadBalancers (which we request by default), and will spam logs otherwise.
log.V(util.DebugLevel).Info("LoadBalancer type Service for the Kong proxy has no IPs", "service", publishService)
} else {
log.Error(err, "failed to look up status info for Kong proxy Service", "service", publishService)
}
}
}

if cfg.ready && status.ready {
log.V(util.DebugLevel).Info("data-plane updates completed, updating resource statuses")
wg.Add(1)
go func() {
if err := UpdateStatuses(ctx, &updateDone, log, cfg.cli, cfg.kiccli, &wg, status.ips,
status.hostname, kubeConfig, cfg.kubernetesVersion); err != nil {
log.Error(err, "failed to update resource statuses")
}
}()
} else {
log.V(util.DebugLevel).Info("config or publish service information unavailable, skipping status update")
}
case <-ctx.Done():
log.Info("stop status update channel.")
wg.Wait()
Expand Down Expand Up @@ -506,23 +532,23 @@ func UpdateKnativeIngress(ctx context.Context, log logr.Logger, svc file.FServic

const proxyLBNotReadyMessage = "LoadBalancer service for proxy is not yet ready"

// RunningAddresses retrieve cluster loader balance IP or hostaddress using networking
func RunningAddresses(ctx context.Context, kubeCfg *rest.Config, publishService string,
publishAddresses []string) ([]string, string, error) {
// runningAddresses retrieve cluster loader balance IP or hostaddress using networking
func runningAddresses(ctx context.Context, kubeCfg *rest.Config, publishService string,
publishAddresses []string) (statusInfo, error) {
addrs := []string{}
if len(publishAddresses) > 0 {
addrs = append(addrs, publishAddresses...)
return addrs, "", nil
return statusInfo{ready: true, ips: addrs, hostname: ""}, nil
}
namespace, name, err := util.ParseNameNS(publishService)
if err != nil {
return nil, "", fmt.Errorf("unable to retrieve service for status: %w", err)
return statusInfo{ready: false, ips: addrs, hostname: ""}, fmt.Errorf("unable to retrieve service for status: %w", err)
}

CoreClient, _ := clientset.NewForConfig(kubeCfg)
svc, err := CoreClient.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return nil, "", err
return statusInfo{ready: false, ips: addrs, hostname: ""}, err
}

clusterDomain := network.GetClusterDomainName()
Expand All @@ -532,7 +558,7 @@ func RunningAddresses(ctx context.Context, kubeCfg *rest.Config, publishService
switch svc.Spec.Type {
case apiv1.ServiceTypeLoadBalancer:
if len(svc.Status.LoadBalancer.Ingress) < 1 {
return addrs, hostname, fmt.Errorf(proxyLBNotReadyMessage)
return statusInfo{ready: false, ips: addrs, hostname: ""}, fmt.Errorf(proxyLBNotReadyMessage)
}

for _, ip := range svc.Status.LoadBalancer.Ingress {
Expand All @@ -544,9 +570,9 @@ func RunningAddresses(ctx context.Context, kubeCfg *rest.Config, publishService
}

addrs = append(addrs, svc.Spec.ExternalIPs...)
return addrs, hostname, nil
return statusInfo{ready: true, ips: addrs, hostname: hostname}, nil
default:
return addrs, hostname, nil
return statusInfo{ready: true, ips: addrs, hostname: hostname}, nil
}
}

Expand Down

0 comments on commit 458151d

Please sign in to comment.