diff --git a/go.mod b/go.mod index 75c52d5f4f..487438bc8c 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,6 @@ require ( github.com/anchore/syft v1.14.0 github.com/avast/retry-go/v4 v4.6.0 github.com/defenseunicorns/pkg/helpers/v2 v2.0.1 - github.com/defenseunicorns/pkg/kubernetes v0.3.0 github.com/defenseunicorns/pkg/oci v1.0.2 github.com/derailed/k9s v0.32.5 github.com/distribution/distribution/v3 v3.0.0-beta.1 @@ -556,7 +555,7 @@ require ( modernc.org/memory v1.8.0 // indirect modernc.org/sqlite v1.33.1 // indirect oras.land/oras-go v1.2.5 // indirect - sigs.k8s.io/controller-runtime v0.19.0 // indirect + sigs.k8s.io/controller-runtime v0.19.0 sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/kustomize/kustomize/v5 v5.4.2 // indirect sigs.k8s.io/release-utils v0.8.4 // indirect diff --git a/go.sum b/go.sum index f1f84685d0..73cfde145c 100644 --- a/go.sum +++ b/go.sum @@ -621,8 +621,6 @@ github.com/defenseunicorns/gojsonschema v0.0.0-20231116163348-e00f069122d6 h1:gw github.com/defenseunicorns/gojsonschema v0.0.0-20231116163348-e00f069122d6/go.mod h1:StKLYMmPj1R5yIs6CK49EkcW1TvUYuw5Vri+LRk7Dy8= github.com/defenseunicorns/pkg/helpers/v2 v2.0.1 h1:j08rz9vhyD9Bs+yKiyQMY2tSSejXRMxTqEObZ5M1Wbk= github.com/defenseunicorns/pkg/helpers/v2 v2.0.1/go.mod h1:u1PAqOICZyiGIVA2v28g55bQH1GiAt0Bc4U9/rnWQvQ= -github.com/defenseunicorns/pkg/kubernetes v0.3.0 h1:f4VSIaUdvn87/dhiZvRbUfHhcHa8bKia6aU0WcvPbYg= -github.com/defenseunicorns/pkg/kubernetes v0.3.0/go.mod h1:FsuKQGpPZOnZWifBse7v787+avtIu2lte5LTsaojDkY= github.com/defenseunicorns/pkg/oci v1.0.2 h1:JRdFbKnJQiGVsMUWmcmm0ZS8aBmmAORXLGSAGkIGhBQ= github.com/defenseunicorns/pkg/oci v1.0.2/go.mod h1:z11UFenAd4HQRucaEp0uhoccor/6zbQiXEQq+Z7vtI0= github.com/deitch/magic v0.0.0-20230404182410-1ff89d7342da h1:ZOjWpVsFZ06eIhnh4mkaceTiVoktdU67+M7KDHJ268M= diff --git a/src/internal/healthchecks/healthchecks.go b/src/internal/healthchecks/healthchecks.go index 7957953378..447ddb81cf 100644 --- a/src/internal/healthchecks/healthchecks.go +++ b/src/internal/healthchecks/healthchecks.go @@ -6,15 +6,21 @@ package healthchecks import ( "context" + "errors" + "fmt" - pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes" "github.com/zarf-dev/zarf/src/api/v1alpha1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/aggregator" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/collector" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event" + "sigs.k8s.io/cli-utils/pkg/kstatus/status" "sigs.k8s.io/cli-utils/pkg/kstatus/watcher" "sigs.k8s.io/cli-utils/pkg/object" ) -// Run waits for a list of objects to be reconciled +// Run waits for a list of Zarf healthchecks to reach a ready state. func Run(ctx context.Context, watcher watcher.StatusWatcher, healthChecks []v1alpha1.NamespacedObjectKindReference) error { objs := []object.ObjMetadata{} for _, hc := range healthChecks { @@ -32,9 +38,99 @@ func Run(ctx context.Context, watcher watcher.StatusWatcher, healthChecks []v1al } objs = append(objs, obj) } - err := pkgkubernetes.WaitForReady(ctx, watcher, objs) + err := WaitForReady(ctx, watcher, objs) if err != nil { return err } return nil } + +// WaitForReadyRuntime waits for all of the objects to reach a ready state. +func WaitForReadyRuntime(ctx context.Context, sw watcher.StatusWatcher, robjs []runtime.Object) error { + objs := []object.ObjMetadata{} + for _, robj := range robjs { + obj, err := object.RuntimeToObjMeta(robj) + if err != nil { + return err + } + objs = append(objs, obj) + } + return WaitForReady(ctx, sw, objs) +} + +// WaitForReady waits for all of the objects to reach a ready state. +func WaitForReady(ctx context.Context, sw watcher.StatusWatcher, objs []object.ObjMetadata) error { + cancelCtx, cancel := context.WithCancel(ctx) + defer cancel() + + eventCh := sw.Watch(cancelCtx, objs, watcher.Options{}) + statusCollector := collector.NewResourceStatusCollector(objs) + done := statusCollector.ListenWithObserver(eventCh, collector.ObserverFunc( + func(statusCollector *collector.ResourceStatusCollector, _ event.Event) { + rss := []*event.ResourceStatus{} + for _, rs := range statusCollector.ResourceStatuses { + if rs == nil { + continue + } + rss = append(rss, rs) + } + desired := status.CurrentStatus + if aggregator.AggregateStatus(rss, desired) == desired { + cancel() + return + } + }), + ) + <-done + + if statusCollector.Error != nil { + return statusCollector.Error + } + + // Only check parent context error, otherwise we would error when desired status is achieved. + if ctx.Err() != nil { + errs := []error{} + for _, id := range objs { + rs := statusCollector.ResourceStatuses[id] + switch rs.Status { + case status.CurrentStatus: + case status.NotFoundStatus: + errs = append(errs, fmt.Errorf("%s: %s not found", rs.Identifier.Name, rs.Identifier.GroupKind.Kind)) + default: + errs = append(errs, fmt.Errorf("%s: %s not ready", rs.Identifier.Name, rs.Identifier.GroupKind.Kind)) + } + } + errs = append(errs, ctx.Err()) + return errors.Join(errs...) + } + + return nil +} + +// ImmediateWatcher should only be used for testing and returns the set status immediately. +type ImmediateWatcher struct { + status status.Status +} + +// NewImmediateWatcher returns a ImmediateWatcher. +func NewImmediateWatcher(status status.Status) *ImmediateWatcher { + return &ImmediateWatcher{ + status: status, + } +} + +// Watch watches the given objects and immediately returns the configured status. +func (w *ImmediateWatcher) Watch(_ context.Context, objs object.ObjMetadataSet, _ watcher.Options) <-chan event.Event { + eventCh := make(chan event.Event, len(objs)) + for _, obj := range objs { + eventCh <- event.Event{ + Type: event.ResourceUpdateEvent, + Resource: &event.ResourceStatus{ + Identifier: obj, + Status: w.status, + }, + } + } + close(eventCh) + return eventCh +} diff --git a/src/internal/healthchecks/healthchecks_test.go b/src/internal/healthchecks/healthchecks_test.go index 9761f7ba84..d52d0d2ff9 100644 --- a/src/internal/healthchecks/healthchecks_test.go +++ b/src/internal/healthchecks/healthchecks_test.go @@ -6,6 +6,7 @@ package healthchecks import ( "context" + "errors" "testing" "time" @@ -45,19 +46,19 @@ metadata: func TestRunHealthChecks(t *testing.T) { t.Parallel() tests := []struct { - name string - podYaml string - expectErr error + name string + podYamls []string + expectErrs []error }{ { - name: "Pod is running", - podYaml: podCurrentYaml, - expectErr: nil, + name: "Pod is ready", + podYamls: []string{podCurrentYaml}, + expectErrs: nil, }, { - name: "Pod is never ready", - podYaml: podYaml, - expectErr: context.DeadlineExceeded, + name: "One pod is never ready", + podYamls: []string{podYaml, podCurrentYaml}, + expectErrs: []error{errors.New("in-progress-pod: Pod not ready"), context.DeadlineExceeded}, }, } @@ -70,24 +71,27 @@ func TestRunHealthChecks(t *testing.T) { ) ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() - m := make(map[string]interface{}) - err := yaml.Unmarshal([]byte(tt.podYaml), &m) - require.NoError(t, err) - pod := &unstructured.Unstructured{Object: m} statusWatcher := watcher.NewDefaultStatusWatcher(fakeClient, fakeMapper) - podGVR := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} - require.NoError(t, fakeClient.Tracker().Create(podGVR, pod, pod.GetNamespace())) - objs := []v1alpha1.NamespacedObjectKindReference{ - { + objs := []v1alpha1.NamespacedObjectKindReference{} + for _, podYaml := range tt.podYamls { + m := make(map[string]interface{}) + err := yaml.Unmarshal([]byte(podYaml), &m) + require.NoError(t, err) + pod := &unstructured.Unstructured{Object: m} + podGVR := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} + err = fakeClient.Tracker().Create(podGVR, pod, pod.GetNamespace()) + require.NoError(t, err) + objs = append(objs, v1alpha1.NamespacedObjectKindReference{ APIVersion: pod.GetAPIVersion(), Kind: pod.GetKind(), Namespace: pod.GetNamespace(), Name: pod.GetName(), - }, + }) } - err = Run(ctx, statusWatcher, objs) - if tt.expectErr != nil { - require.ErrorIs(t, err, tt.expectErr) + + err := Run(ctx, statusWatcher, objs) + if tt.expectErrs != nil { + require.EqualError(t, err, errors.Join(tt.expectErrs...).Error()) return } require.NoError(t, err) diff --git a/src/internal/packager/helm/chart.go b/src/internal/packager/helm/chart.go index 6009d5f9bc..954b1d6144 100644 --- a/src/internal/packager/helm/chart.go +++ b/src/internal/packager/helm/chart.go @@ -23,9 +23,9 @@ import ( "helm.sh/helm/v3/pkg/releaseutil" "helm.sh/helm/v3/pkg/storage/driver" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/yaml" - "github.com/zarf-dev/zarf/src/api/v1alpha1" "github.com/zarf-dev/zarf/src/config" "github.com/zarf-dev/zarf/src/internal/healthchecks" "github.com/zarf-dev/zarf/src/pkg/message" @@ -129,20 +129,14 @@ func (h *Helm) InstallOrUpgradeChart(ctx context.Context) (types.ConnectStrings, return nil, "", fmt.Errorf("unable to build the resource list: %w", err) } - healthChecks := []v1alpha1.NamespacedObjectKindReference{} + runtimeObjs := []runtime.Object{} for _, resource := range resourceList { - apiVersion, kind := resource.Object.GetObjectKind().GroupVersionKind().ToAPIVersionAndKind() - healthChecks = append(healthChecks, v1alpha1.NamespacedObjectKindReference{ - APIVersion: apiVersion, - Kind: kind, - Name: resource.Name, - Namespace: resource.Namespace, - }) + runtimeObjs = append(runtimeObjs, resource.Object) } if !h.chart.NoWait { // Ensure we don't go past the timeout by using a context initialized with the helm timeout spinner.Updatef("Running health checks") - if err := healthchecks.Run(helmCtx, h.cluster.Watcher, healthChecks); err != nil { + if err := healthchecks.WaitForReadyRuntime(helmCtx, h.cluster.Watcher, runtimeObjs); err != nil { return nil, "", err } } diff --git a/src/internal/packager/helm/zarf.go b/src/internal/packager/helm/zarf.go index 03e4db2ed0..b6945a6e8c 100644 --- a/src/internal/packager/helm/zarf.go +++ b/src/internal/packager/helm/zarf.go @@ -13,11 +13,10 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/cli-utils/pkg/object" - pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/zarf-dev/zarf/src/api/v1alpha1" + "github.com/zarf-dev/zarf/src/internal/healthchecks" "github.com/zarf-dev/zarf/src/internal/packager/template" "github.com/zarf-dev/zarf/src/pkg/cluster" "github.com/zarf-dev/zarf/src/pkg/message" @@ -61,7 +60,7 @@ func (h *Helm) UpdateZarfRegistryValues(ctx context.Context) error { } waitCtx, waitCancel := context.WithTimeout(ctx, 60*time.Second) defer waitCancel() - err = pkgkubernetes.WaitForReady(waitCtx, h.cluster.Watcher, objs) + err = healthchecks.WaitForReady(waitCtx, h.cluster.Watcher, objs) if err != nil { return err } @@ -157,7 +156,7 @@ func (h *Helm) UpdateZarfAgentValues(ctx context.Context) error { } waitCtx, waitCancel := context.WithTimeout(ctx, 60*time.Second) defer waitCancel() - err = pkgkubernetes.WaitForReady(waitCtx, h.cluster.Watcher, objs) + err = healthchecks.WaitForReady(waitCtx, h.cluster.Watcher, objs) if err != nil { return err } diff --git a/src/pkg/cluster/cluster.go b/src/pkg/cluster/cluster.go index 5db77e0c9c..a97b3066bc 100644 --- a/src/pkg/cluster/cluster.go +++ b/src/pkg/cluster/cluster.go @@ -17,9 +17,11 @@ import ( "sigs.k8s.io/cli-utils/pkg/kstatus/watcher" "github.com/avast/retry-go/v4" - pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes" "github.com/zarf-dev/zarf/src/pkg/message" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/clientcmd" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" ) const ( @@ -76,11 +78,11 @@ func NewClusterWithWait(ctx context.Context) (*Cluster, error) { // NewCluster creates a new Cluster instance and validates connection to the cluster by fetching the Kubernetes version. func NewCluster() (*Cluster, error) { clusterErr := errors.New("unable to connect to the cluster") - clientset, config, err := pkgkubernetes.ClientAndConfig() + clientset, config, err := ClientAndConfig() if err != nil { return nil, errors.Join(clusterErr, err) } - watcher, err := pkgkubernetes.WatcherForConfig(config) + watcher, err := WatcherForConfig(config) if err != nil { return nil, errors.Join(clusterErr, err) } @@ -96,3 +98,36 @@ func NewCluster() (*Cluster, error) { } return c, nil } + +// ClientAndConfig returns a Kubernetes client and the rest config used to configure the client. +func ClientAndConfig() (kubernetes.Interface, *rest.Config, error) { + loader := clientcmd.NewDefaultClientConfigLoadingRules() + clientCfg := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loader, nil) + cfg, err := clientCfg.ClientConfig() + if err != nil { + return nil, nil, err + } + clientset, err := kubernetes.NewForConfig(cfg) + if err != nil { + return nil, nil, err + } + return clientset, cfg, nil +} + +// WatcherForConfig returns a status watcher for the give Kubernetes configuration. +func WatcherForConfig(cfg *rest.Config) (watcher.StatusWatcher, error) { + dynamicClient, err := dynamic.NewForConfig(cfg) + if err != nil { + return nil, err + } + httpClient, err := rest.HTTPClientFor(cfg) + if err != nil { + return nil, err + } + restMapper, err := apiutil.NewDynamicRESTMapper(cfg, httpClient) + if err != nil { + return nil, err + } + sw := watcher.NewDefaultStatusWatcher(dynamicClient, restMapper) + return sw, nil +} diff --git a/src/pkg/cluster/injector.go b/src/pkg/cluster/injector.go index 48552ac5e1..c0d36e21e7 100644 --- a/src/pkg/cluster/injector.go +++ b/src/pkg/cluster/injector.go @@ -24,9 +24,9 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "github.com/defenseunicorns/pkg/helpers/v2" - pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes" "github.com/zarf-dev/zarf/src/config" + "github.com/zarf-dev/zarf/src/internal/healthchecks" "github.com/zarf-dev/zarf/src/pkg/message" "github.com/zarf-dev/zarf/src/pkg/transform" "github.com/zarf-dev/zarf/src/pkg/utils" @@ -117,7 +117,7 @@ func (c *Cluster) StartInjection(ctx context.Context, tmpDir, imagesDir string, waitCtx, waitCancel := context.WithTimeout(ctx, 60*time.Second) defer waitCancel() - err = pkgkubernetes.WaitForReadyRuntime(waitCtx, c.Watcher, []runtime.Object{pod}) + err = healthchecks.WaitForReadyRuntime(waitCtx, c.Watcher, []runtime.Object{pod}) if err != nil { return err } diff --git a/src/pkg/cluster/injector_test.go b/src/pkg/cluster/injector_test.go index 67dff422f2..e5acdf3227 100644 --- a/src/pkg/cluster/injector_test.go +++ b/src/pkg/cluster/injector_test.go @@ -15,6 +15,7 @@ import ( "github.com/google/go-containerregistry/pkg/v1/layout" "github.com/google/go-containerregistry/pkg/v1/random" "github.com/stretchr/testify/require" + "github.com/zarf-dev/zarf/src/internal/healthchecks" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -22,8 +23,6 @@ import ( "k8s.io/client-go/kubernetes/fake" k8stesting "k8s.io/client-go/testing" "sigs.k8s.io/cli-utils/pkg/kstatus/status" - - pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes" ) func TestInjector(t *testing.T) { @@ -31,7 +30,7 @@ func TestInjector(t *testing.T) { cs := fake.NewSimpleClientset() c := &Cluster{ Clientset: cs, - Watcher: pkgkubernetes.NewImmediateWatcher(status.CurrentStatus), + Watcher: healthchecks.NewImmediateWatcher(status.CurrentStatus), } cs.PrependReactor("delete-collection", "configmaps", func(action k8stesting.Action) (bool, runtime.Object, error) { delAction, ok := action.(k8stesting.DeleteCollectionActionImpl) diff --git a/src/test/external/ext_in_cluster_test.go b/src/test/external/ext_in_cluster_test.go index 81102c0b14..1b4f668d8c 100644 --- a/src/test/external/ext_in_cluster_test.go +++ b/src/test/external/ext_in_cluster_test.go @@ -14,9 +14,9 @@ import ( "testing" "time" - pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/zarf-dev/zarf/src/internal/healthchecks" "github.com/zarf-dev/zarf/src/pkg/cluster" "github.com/zarf-dev/zarf/src/pkg/utils/exec" "github.com/zarf-dev/zarf/src/test/testutil" @@ -88,7 +88,7 @@ func (suite *ExtInClusterTestSuite) SetupSuite() { } waitCtx, waitCancel := context.WithTimeout(context.Background(), 60*time.Second) defer waitCancel() - err = pkgkubernetes.WaitForReady(waitCtx, c.Watcher, objs) + err = healthchecks.WaitForReady(waitCtx, c.Watcher, objs) suite.NoError(err) } @@ -199,7 +199,7 @@ func (suite *ExtInClusterTestSuite) Test_1_Deploy() { } waitCtx, waitCancel := context.WithTimeout(context.Background(), 60*time.Second) defer waitCancel() - err = pkgkubernetes.WaitForReady(waitCtx, c.Watcher, objs) + err = healthchecks.WaitForReady(waitCtx, c.Watcher, objs) suite.NoError(err) _, _, err = exec.CmdWithTesting(suite.T(), exec.PrintCfg(), zarfBinPath, "destroy", "--confirm")