Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check readiness of Flux kinds using kstatus #4311

Merged
merged 1 commit into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/flux/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (names apiType) upsertAndWait(object upsertWaitable, mutate func() error) e

logger.Waitingf("waiting for %s reconciliation", names.kind)
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isReady(kubeClient, namespacedName, object)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, object.asClientObject())); err != nil {
return err
}
logger.Successf("%s reconciliation completed", names.kind)
Expand Down
22 changes: 1 addition & 21 deletions cmd/flux/create_alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -133,7 +132,7 @@ func createAlertCmdRun(cmd *cobra.Command, args []string) error {

logger.Waitingf("waiting for Alert reconciliation")
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isAlertReady(kubeClient, namespacedName, &alert)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, &alert)); err != nil {
return err
}
logger.Successf("Alert %s is ready", name)
Expand Down Expand Up @@ -170,22 +169,3 @@ func upsertAlert(ctx context.Context, kubeClient client.Client,
logger.Successf("Alert updated")
return namespacedName, nil
}

func isAlertReady(kubeClient client.Client, namespacedName types.NamespacedName, alert *notificationv1b2.Alert) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
err := kubeClient.Get(ctx, namespacedName, alert)
if err != nil {
return false, err
}

if c := apimeta.FindStatusCondition(alert.Status.Conditions, meta.ReadyCondition); c != nil {
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
22 changes: 1 addition & 21 deletions cmd/flux/create_alertprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -128,7 +127,7 @@ func createAlertProviderCmdRun(cmd *cobra.Command, args []string) error {

logger.Waitingf("waiting for Provider reconciliation")
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isAlertProviderReady(kubeClient, namespacedName, &provider)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, &provider)); err != nil {
return err
}

Expand Down Expand Up @@ -167,22 +166,3 @@ func upsertAlertProvider(ctx context.Context, kubeClient client.Client,
logger.Successf("Provider updated")
return namespacedName, nil
}

func isAlertProviderReady(kubeClient client.Client, namespacedName types.NamespacedName, provider *notificationv1.Provider) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
err := kubeClient.Get(ctx, namespacedName, provider)
if err != nil {
return false, err
}

if c := apimeta.FindStatusCondition(provider.Status.Conditions, meta.ReadyCondition); c != nil {
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
19 changes: 1 addition & 18 deletions cmd/flux/create_helmrelease.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/spf13/cobra"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -304,7 +303,7 @@ func createHelmReleaseCmdRun(cmd *cobra.Command, args []string) error {

logger.Waitingf("waiting for HelmRelease reconciliation")
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isHelmReleaseReady(kubeClient, namespacedName, &helmRelease)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, &helmRelease)); err != nil {
return err
}
logger.Successf("HelmRelease %s is ready", name)
Expand Down Expand Up @@ -344,22 +343,6 @@ func upsertHelmRelease(ctx context.Context, kubeClient client.Client,
return namespacedName, nil
}

func isHelmReleaseReady(kubeClient client.Client, namespacedName types.NamespacedName, helmRelease *helmv2.HelmRelease) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
err := kubeClient.Get(ctx, namespacedName, helmRelease)
if err != nil {
return false, err
}

// Confirm the state we are observing is for the current generation
if helmRelease.Generation != helmRelease.Status.ObservedGeneration {
return false, nil
}

return apimeta.IsStatusConditionTrue(helmRelease.Status.Conditions, meta.ReadyCondition), nil
}
}

func validateStrategy(input string) bool {
allowedStrategy := []string{"Revision", "ChartVersion"}

Expand Down
27 changes: 1 addition & 26 deletions cmd/flux/create_kustomization.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -264,7 +263,7 @@ func createKsCmdRun(cmd *cobra.Command, args []string) error {

logger.Waitingf("waiting for Kustomization reconciliation")
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isKustomizationReady(kubeClient, namespacedName, &kustomization)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, &kustomization)); err != nil {
return err
}
logger.Successf("Kustomization %s is ready", name)
Expand Down Expand Up @@ -303,27 +302,3 @@ func upsertKustomization(ctx context.Context, kubeClient client.Client,
logger.Successf("Kustomization updated")
return namespacedName, nil
}

func isKustomizationReady(kubeClient client.Client, namespacedName types.NamespacedName, kustomization *kustomizev1.Kustomization) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
err := kubeClient.Get(ctx, namespacedName, kustomization)
if err != nil {
return false, err
}

// Confirm the state we are observing is for the current generation
if kustomization.Generation != kustomization.Status.ObservedGeneration {
return false, nil
}

if c := apimeta.FindStatusCondition(kustomization.Status.Conditions, meta.ReadyCondition); c != nil {
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
22 changes: 1 addition & 21 deletions cmd/flux/create_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -140,7 +139,7 @@ func createReceiverCmdRun(cmd *cobra.Command, args []string) error {

logger.Waitingf("waiting for Receiver reconciliation")
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isReceiverReady(kubeClient, namespacedName, &receiver)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, &receiver)); err != nil {
return err
}
logger.Successf("Receiver %s is ready", name)
Expand Down Expand Up @@ -179,22 +178,3 @@ func upsertReceiver(ctx context.Context, kubeClient client.Client,
logger.Successf("Receiver updated")
return namespacedName, nil
}

func isReceiverReady(kubeClient client.Client, namespacedName types.NamespacedName, receiver *notificationv1.Receiver) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
err := kubeClient.Get(ctx, namespacedName, receiver)
if err != nil {
return false, err
}

if c := apimeta.FindStatusCondition(receiver.Status.Conditions, meta.ReadyCondition); c != nil {
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
29 changes: 1 addition & 28 deletions cmd/flux/create_source_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"

sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"

Expand Down Expand Up @@ -205,7 +204,7 @@ func createSourceBucketCmdRun(cmd *cobra.Command, args []string) error {

logger.Waitingf("waiting for Bucket source reconciliation")
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isBucketReady(kubeClient, namespacedName, bucket)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, bucket)); err != nil {
return err
}
logger.Successf("Bucket source reconciliation completed")
Expand Down Expand Up @@ -247,29 +246,3 @@ func upsertBucket(ctx context.Context, kubeClient client.Client,
logger.Successf("Bucket source updated")
return namespacedName, nil
}

func isBucketReady(kubeClient client.Client, namespacedName types.NamespacedName, bucket *sourcev1.Bucket) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
err := kubeClient.Get(ctx, namespacedName, bucket)
if err != nil {
return false, err
}

if c := conditions.Get(bucket, meta.ReadyCondition); c != nil {
// Confirm the Ready condition we are observing is for the
// current generation
if c.ObservedGeneration != bucket.GetGeneration() {
return false, nil
}

// Further check the Status
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
29 changes: 1 addition & 28 deletions cmd/flux/create_source_git.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"sigs.k8s.io/yaml"

"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"

sourcev1 "github.com/fluxcd/source-controller/api/v1"

Expand Down Expand Up @@ -326,7 +325,7 @@ func createSourceGitCmdRun(cmd *cobra.Command, args []string) error {

logger.Waitingf("waiting for GitRepository source reconciliation")
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isGitRepositoryReady(kubeClient, namespacedName, &gitRepository)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, &gitRepository)); err != nil {
return err
}
logger.Successf("GitRepository source reconciliation completed")
Expand Down Expand Up @@ -368,29 +367,3 @@ func upsertGitRepository(ctx context.Context, kubeClient client.Client,
logger.Successf("GitRepository source updated")
return namespacedName, nil
}

func isGitRepositoryReady(kubeClient client.Client, namespacedName types.NamespacedName, gitRepository *sourcev1.GitRepository) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
err := kubeClient.Get(ctx, namespacedName, gitRepository)
if err != nil {
return false, err
}

if c := conditions.Get(gitRepository, meta.ReadyCondition); c != nil {
// Confirm the Ready condition we are observing is for the
// current generation
if c.ObservedGeneration != gitRepository.GetGeneration() {
return false, nil
}

// Further check the Status
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
11 changes: 11 additions & 0 deletions cmd/flux/create_source_git_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,21 @@ func TestCreateSourceGit(t *testing.T) {
Time: time.Now(),
},
}
repo.Status.ObservedGeneration = repo.GetGeneration()
},
}, {
"Failed",
command,
assertError("failed message"),
func(repo *sourcev1.GitRepository) {
stalledCondition := metav1.Condition{
Type: meta.StalledCondition,
Status: metav1.ConditionTrue,
Reason: sourcev1.URLInvalidReason,
Message: "failed message",
ObservedGeneration: repo.GetGeneration(),
}
apimeta.SetStatusCondition(&repo.Status.Conditions, stalledCondition)
newCondition := metav1.Condition{
Type: meta.ReadyCondition,
Status: metav1.ConditionFalse,
Expand All @@ -195,6 +204,7 @@ func TestCreateSourceGit(t *testing.T) {
ObservedGeneration: repo.GetGeneration(),
}
apimeta.SetStatusCondition(&repo.Status.Conditions, newCondition)
repo.Status.ObservedGeneration = repo.GetGeneration()
},
}, {
"NoArtifact",
Expand All @@ -210,6 +220,7 @@ func TestCreateSourceGit(t *testing.T) {
ObservedGeneration: repo.GetGeneration(),
}
apimeta.SetStatusCondition(&repo.Status.Conditions, newCondition)
repo.Status.ObservedGeneration = repo.GetGeneration()
},
},
}
Expand Down
29 changes: 1 addition & 28 deletions cmd/flux/create_source_helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"os"

"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -232,7 +231,7 @@ func createSourceHelmCmdRun(cmd *cobra.Command, args []string) error {

logger.Waitingf("waiting for HelmRepository source reconciliation")
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isHelmRepositoryReady(kubeClient, namespacedName, helmRepository)); err != nil {
isObjectReadyConditionFunc(kubeClient, namespacedName, helmRepository)); err != nil {
return err
}
logger.Successf("HelmRepository source reconciliation completed")
Expand Down Expand Up @@ -279,29 +278,3 @@ func upsertHelmRepository(ctx context.Context, kubeClient client.Client,
logger.Successf("source updated")
return namespacedName, nil
}

func isHelmRepositoryReady(kubeClient client.Client, namespacedName types.NamespacedName, helmRepository *sourcev1.HelmRepository) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
err := kubeClient.Get(ctx, namespacedName, helmRepository)
if err != nil {
return false, err
}

if c := conditions.Get(helmRepository, meta.ReadyCondition); c != nil {
// Confirm the Ready condition we are observing is for the
// current generation
if c.ObservedGeneration != helmRepository.GetGeneration() {
return false, nil
}

// Further check the Status
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
Loading
Loading