Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move pkg oci to Zarf and log during health checks #3106

Merged
merged 9 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ require (
github.com/anchore/syft v1.14.0
github.com/avast/retry-go/v4 v4.6.0
github.com/defenseunicorns/pkg/helpers/v2 v2.0.1
github.com/defenseunicorns/pkg/kubernetes v0.3.0
github.com/defenseunicorns/pkg/oci v1.0.2
github.com/derailed/k9s v0.32.5
github.com/distribution/distribution/v3 v3.0.0-beta.1
Expand Down Expand Up @@ -556,7 +555,7 @@ require (
modernc.org/memory v1.8.0 // indirect
modernc.org/sqlite v1.33.1 // indirect
oras.land/oras-go v1.2.5 // indirect
sigs.k8s.io/controller-runtime v0.19.0 // indirect
sigs.k8s.io/controller-runtime v0.19.0
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/kustomize/kustomize/v5 v5.4.2 // indirect
sigs.k8s.io/release-utils v0.8.4 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -621,8 +621,6 @@ github.com/defenseunicorns/gojsonschema v0.0.0-20231116163348-e00f069122d6 h1:gw
github.com/defenseunicorns/gojsonschema v0.0.0-20231116163348-e00f069122d6/go.mod h1:StKLYMmPj1R5yIs6CK49EkcW1TvUYuw5Vri+LRk7Dy8=
github.com/defenseunicorns/pkg/helpers/v2 v2.0.1 h1:j08rz9vhyD9Bs+yKiyQMY2tSSejXRMxTqEObZ5M1Wbk=
github.com/defenseunicorns/pkg/helpers/v2 v2.0.1/go.mod h1:u1PAqOICZyiGIVA2v28g55bQH1GiAt0Bc4U9/rnWQvQ=
github.com/defenseunicorns/pkg/kubernetes v0.3.0 h1:f4VSIaUdvn87/dhiZvRbUfHhcHa8bKia6aU0WcvPbYg=
github.com/defenseunicorns/pkg/kubernetes v0.3.0/go.mod h1:FsuKQGpPZOnZWifBse7v787+avtIu2lte5LTsaojDkY=
github.com/defenseunicorns/pkg/oci v1.0.2 h1:JRdFbKnJQiGVsMUWmcmm0ZS8aBmmAORXLGSAGkIGhBQ=
github.com/defenseunicorns/pkg/oci v1.0.2/go.mod h1:z11UFenAd4HQRucaEp0uhoccor/6zbQiXEQq+Z7vtI0=
github.com/deitch/magic v0.0.0-20230404182410-1ff89d7342da h1:ZOjWpVsFZ06eIhnh4mkaceTiVoktdU67+M7KDHJ268M=
Expand Down
100 changes: 97 additions & 3 deletions src/internal/healthchecks/healthchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,21 @@ package healthchecks

import (
"context"
"strings"

pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes"
"github.com/zarf-dev/zarf/src/api/v1alpha1"
"github.com/zarf-dev/zarf/src/pkg/message"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/aggregator"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/collector"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
"sigs.k8s.io/cli-utils/pkg/object"
)

// Run waits for a list of objects to be reconciled
// Run waits for a list of Zarf healthchecks to reach a ready state.
func Run(ctx context.Context, watcher watcher.StatusWatcher, healthChecks []v1alpha1.NamespacedObjectKindReference) error {
objs := []object.ObjMetadata{}
for _, hc := range healthChecks {
Expand All @@ -32,9 +38,97 @@ func Run(ctx context.Context, watcher watcher.StatusWatcher, healthChecks []v1al
}
objs = append(objs, obj)
}
err := pkgkubernetes.WaitForReady(ctx, watcher, objs)
err := WaitForReady(ctx, watcher, objs)
if err != nil {
return err
}
return nil
}

// WaitForReadyRuntime waits for all of the objects to reach a ready state.
func WaitForReadyRuntime(ctx context.Context, sw watcher.StatusWatcher, robjs []runtime.Object) error {
objs := []object.ObjMetadata{}
for _, robj := range robjs {
obj, err := object.RuntimeToObjMeta(robj)
if err != nil {
return err
}
objs = append(objs, obj)
}
return WaitForReady(ctx, sw, objs)
}

// WaitForReady waits for all of the objects to reach a ready state.
func WaitForReady(ctx context.Context, sw watcher.StatusWatcher, objs []object.ObjMetadata) error {
cancelCtx, cancel := context.WithCancel(ctx)
defer cancel()

eventCh := sw.Watch(cancelCtx, objs, watcher.Options{})
statusCollector := collector.NewResourceStatusCollector(objs)
done := statusCollector.ListenWithObserver(eventCh, collector.ObserverFunc(
func(statusCollector *collector.ResourceStatusCollector, _ event.Event) {
rss := []*event.ResourceStatus{}
for _, rs := range statusCollector.ResourceStatuses {
if rs == nil {
continue
}
rss = append(rss, rs)
}
desired := status.CurrentStatus
if aggregator.AggregateStatus(rss, desired) == desired {
cancel()
return
}
}),
)
<-done

for _, id := range objs {
rs := statusCollector.ResourceStatuses[id]
switch rs.Status {
case status.CurrentStatus:
message.Debugf("%s: %s ready", rs.Identifier.Name, strings.ToLower(rs.Identifier.GroupKind.Kind))
case status.NotFoundStatus:
message.Warnf("%s: %s not found", rs.Identifier.Name, strings.ToLower(rs.Identifier.GroupKind.Kind))
default:
message.Warnf("%s: %s not ready", rs.Identifier.Name, strings.ToLower(rs.Identifier.GroupKind.Kind))
AustinAbro321 marked this conversation as resolved.
Show resolved Hide resolved
}
}

if statusCollector.Error != nil {
return statusCollector.Error
}
// Only check parent context error, otherwise we would error when desired status is achieved.
if ctx.Err() != nil {
return ctx.Err()
}
return nil
}

// ImmediateWatcher should only be used for testing and returns the set status immediately.
type ImmediateWatcher struct {
status status.Status
}

// NewImmediateWatcher returns a ImmediateWatcher.
func NewImmediateWatcher(status status.Status) *ImmediateWatcher {
return &ImmediateWatcher{
status: status,
}
}

// Watch watches the given objects and immediately returns the configured status.
func (w *ImmediateWatcher) Watch(_ context.Context, objs object.ObjMetadataSet, _ watcher.Options) <-chan event.Event {
eventCh := make(chan event.Event, len(objs))
for _, obj := range objs {
eventCh <- event.Event{
Type: event.ResourceUpdateEvent,
Resource: &event.ResourceStatus{
Identifier: obj,
Status: w.status,
},
}
}
close(eventCh)
return eventCh
}
14 changes: 4 additions & 10 deletions src/internal/packager/helm/chart.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"helm.sh/helm/v3/pkg/releaseutil"
"helm.sh/helm/v3/pkg/storage/driver"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/yaml"

"github.com/zarf-dev/zarf/src/api/v1alpha1"
"github.com/zarf-dev/zarf/src/config"
"github.com/zarf-dev/zarf/src/internal/healthchecks"
"github.com/zarf-dev/zarf/src/pkg/message"
Expand Down Expand Up @@ -129,20 +129,14 @@ func (h *Helm) InstallOrUpgradeChart(ctx context.Context) (types.ConnectStrings,
return nil, "", fmt.Errorf("unable to build the resource list: %w", err)
}

healthChecks := []v1alpha1.NamespacedObjectKindReference{}
runtimeObjs := []runtime.Object{}
for _, resource := range resourceList {
apiVersion, kind := resource.Object.GetObjectKind().GroupVersionKind().ToAPIVersionAndKind()
healthChecks = append(healthChecks, v1alpha1.NamespacedObjectKindReference{
APIVersion: apiVersion,
Kind: kind,
Name: resource.Name,
Namespace: resource.Namespace,
})
runtimeObjs = append(runtimeObjs, resource.Object)
}
if !h.chart.NoWait {
// Ensure we don't go past the timeout by using a context initialized with the helm timeout
spinner.Updatef("Running health checks")
if err := healthchecks.Run(helmCtx, h.cluster.Watcher, healthChecks); err != nil {
if err := healthchecks.WaitForReadyRuntime(helmCtx, h.cluster.Watcher, runtimeObjs); err != nil {
return nil, "", err
}
}
Expand Down
7 changes: 3 additions & 4 deletions src/internal/packager/helm/zarf.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/object"

pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/zarf-dev/zarf/src/api/v1alpha1"
"github.com/zarf-dev/zarf/src/internal/healthchecks"
"github.com/zarf-dev/zarf/src/internal/packager/template"
"github.com/zarf-dev/zarf/src/pkg/cluster"
"github.com/zarf-dev/zarf/src/pkg/message"
Expand Down Expand Up @@ -61,7 +60,7 @@ func (h *Helm) UpdateZarfRegistryValues(ctx context.Context) error {
}
waitCtx, waitCancel := context.WithTimeout(ctx, 60*time.Second)
defer waitCancel()
err = pkgkubernetes.WaitForReady(waitCtx, h.cluster.Watcher, objs)
err = healthchecks.WaitForReady(waitCtx, h.cluster.Watcher, objs)
if err != nil {
return err
}
Expand Down Expand Up @@ -157,7 +156,7 @@ func (h *Helm) UpdateZarfAgentValues(ctx context.Context) error {
}
waitCtx, waitCancel := context.WithTimeout(ctx, 60*time.Second)
defer waitCancel()
err = pkgkubernetes.WaitForReady(waitCtx, h.cluster.Watcher, objs)
err = healthchecks.WaitForReady(waitCtx, h.cluster.Watcher, objs)
if err != nil {
return err
}
Expand Down
41 changes: 38 additions & 3 deletions src/pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ import (
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"

"github.com/avast/retry-go/v4"
pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes"

"github.com/zarf-dev/zarf/src/pkg/message"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

const (
Expand Down Expand Up @@ -76,11 +78,11 @@ func NewClusterWithWait(ctx context.Context) (*Cluster, error) {
// NewCluster creates a new Cluster instance and validates connection to the cluster by fetching the Kubernetes version.
func NewCluster() (*Cluster, error) {
clusterErr := errors.New("unable to connect to the cluster")
clientset, config, err := pkgkubernetes.ClientAndConfig()
clientset, config, err := ClientAndConfig()
if err != nil {
return nil, errors.Join(clusterErr, err)
}
watcher, err := pkgkubernetes.WatcherForConfig(config)
watcher, err := WatcherForConfig(config)
if err != nil {
return nil, errors.Join(clusterErr, err)
}
Expand All @@ -96,3 +98,36 @@ func NewCluster() (*Cluster, error) {
}
return c, nil
}

// ClientAndConfig returns a Kubernetes client and the rest config used to configure the client.
func ClientAndConfig() (kubernetes.Interface, *rest.Config, error) {
loader := clientcmd.NewDefaultClientConfigLoadingRules()
clientCfg := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loader, nil)
cfg, err := clientCfg.ClientConfig()
if err != nil {
return nil, nil, err
}
clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, nil, err
}
return clientset, cfg, nil
}

// WatcherForConfig returns a status watcher for the give Kubernetes configuration.
func WatcherForConfig(cfg *rest.Config) (watcher.StatusWatcher, error) {
dynamicClient, err := dynamic.NewForConfig(cfg)
if err != nil {
return nil, err
}
httpClient, err := rest.HTTPClientFor(cfg)
if err != nil {
return nil, err
}
restMapper, err := apiutil.NewDynamicRESTMapper(cfg, httpClient)
if err != nil {
return nil, err
}
sw := watcher.NewDefaultStatusWatcher(dynamicClient, restMapper)
return sw, nil
}
4 changes: 2 additions & 2 deletions src/pkg/cluster/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"k8s.io/apimachinery/pkg/util/wait"

"github.com/defenseunicorns/pkg/helpers/v2"
pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes"

"github.com/zarf-dev/zarf/src/config"
"github.com/zarf-dev/zarf/src/internal/healthchecks"
"github.com/zarf-dev/zarf/src/pkg/message"
"github.com/zarf-dev/zarf/src/pkg/transform"
"github.com/zarf-dev/zarf/src/pkg/utils"
Expand Down Expand Up @@ -117,7 +117,7 @@ func (c *Cluster) StartInjection(ctx context.Context, tmpDir, imagesDir string,

waitCtx, waitCancel := context.WithTimeout(ctx, 60*time.Second)
defer waitCancel()
err = pkgkubernetes.WaitForReadyRuntime(waitCtx, c.Watcher, []runtime.Object{pod})
err = healthchecks.WaitForReadyRuntime(waitCtx, c.Watcher, []runtime.Object{pod})
if err != nil {
return err
}
Expand Down
5 changes: 2 additions & 3 deletions src/pkg/cluster/injector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,22 @@ import (
"github.com/google/go-containerregistry/pkg/v1/layout"
"github.com/google/go-containerregistry/pkg/v1/random"
"github.com/stretchr/testify/require"
"github.com/zarf-dev/zarf/src/internal/healthchecks"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"

pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes"
)

func TestInjector(t *testing.T) {
ctx := context.Background()
cs := fake.NewSimpleClientset()
c := &Cluster{
Clientset: cs,
Watcher: pkgkubernetes.NewImmediateWatcher(status.CurrentStatus),
Watcher: healthchecks.NewImmediateWatcher(status.CurrentStatus),
}
cs.PrependReactor("delete-collection", "configmaps", func(action k8stesting.Action) (bool, runtime.Object, error) {
delAction, ok := action.(k8stesting.DeleteCollectionActionImpl)
Expand Down
6 changes: 3 additions & 3 deletions src/test/external/ext_in_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
"testing"
"time"

pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/zarf-dev/zarf/src/internal/healthchecks"
"github.com/zarf-dev/zarf/src/pkg/cluster"
"github.com/zarf-dev/zarf/src/pkg/utils/exec"
"github.com/zarf-dev/zarf/src/test/testutil"
Expand Down Expand Up @@ -88,7 +88,7 @@ func (suite *ExtInClusterTestSuite) SetupSuite() {
}
waitCtx, waitCancel := context.WithTimeout(context.Background(), 60*time.Second)
defer waitCancel()
err = pkgkubernetes.WaitForReady(waitCtx, c.Watcher, objs)
err = healthchecks.WaitForReady(waitCtx, c.Watcher, objs)
suite.NoError(err)
}

Expand Down Expand Up @@ -199,7 +199,7 @@ func (suite *ExtInClusterTestSuite) Test_1_Deploy() {
}
waitCtx, waitCancel := context.WithTimeout(context.Background(), 60*time.Second)
defer waitCancel()
err = pkgkubernetes.WaitForReady(waitCtx, c.Watcher, objs)
err = healthchecks.WaitForReady(waitCtx, c.Watcher, objs)
suite.NoError(err)

_, _, err = exec.CmdWithTesting(suite.T(), exec.PrintCfg(), zarfBinPath, "destroy", "--confirm")
Expand Down