Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Modulereleasemeta catalog sync #2014

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions internal/controller/kyma/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,9 +519,15 @@ func (r *Reconciler) syncModuleCatalog(ctx context.Context, kyma *v1beta2.Kyma)
modulesToSync = append(modulesToSync, mt)
}
}

moduleReleaseMetaList := &v1beta2.ModuleReleaseMetaList{}
if err := r.List(ctx, moduleReleaseMetaList, &client.ListOptions{}); err != nil {
return fmt.Errorf("could not aggregate module release metas for module catalog sync: %w", err)
}

remoteCatalog := remote.NewRemoteCatalogFromKyma(r.Client, r.SkrContextFactory, r.RemoteSyncNamespace)
if err := remoteCatalog.Sync(ctx, kyma.GetNamespacedName(), modulesToSync); err != nil {
return fmt.Errorf("could not synchronize remote module catalog: %w", err)
if err := remoteCatalog.Sync(ctx, kyma.GetNamespacedName(), modulesToSync, moduleReleaseMetaList.Items); err != nil {
return err
}

return nil
Expand Down
75 changes: 72 additions & 3 deletions internal/remote/crd_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
apimetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/discovery"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/kyma-project/lifecycle-manager/api/v1beta2"
"github.com/kyma-project/lifecycle-manager/internal/crd"
"github.com/kyma-project/lifecycle-manager/internal/util/collections"
"github.com/kyma-project/lifecycle-manager/pkg/util"
)

type SyncCrdsUseCase struct {
Expand Down Expand Up @@ -47,11 +49,12 @@ func (s *SyncCrdsUseCase) Execute(ctx context.Context, kyma *v1beta2.Kyma) (bool
if err != nil {
return false, fmt.Errorf("failed to get SKR context: %w", err)
}

kymaCrdUpdated, err := s.fetchCrdsAndUpdateKymaAnnotations(ctx, skrContext.Client, kyma, shared.KymaKind.Plural())
if err != nil {
err = client.IgnoreNotFound(err)
if err != nil {
return false, fmt.Errorf("failed to fetch module template CRDs and update Kyma annotations: %w", err)
return false, fmt.Errorf("failed to fetch Kyma CRDs and update Kyma annotations: %w", err)
}
}

Expand All @@ -60,11 +63,20 @@ func (s *SyncCrdsUseCase) Execute(ctx context.Context, kyma *v1beta2.Kyma) (bool
if err != nil {
err = client.IgnoreNotFound(err)
if err != nil {
return false, fmt.Errorf("failed to fetch kyma CRDs and update Kyma annotations: %w", err)
return false, fmt.Errorf("failed to fetch ModuleTemplate CRDs and update Kyma annotations: %w", err)
}
}

moduleReleaseMetaCrdUpdated, err := s.fetchCrdsAndUpdateKymaAnnotations(ctx, skrContext.Client, kyma,
shared.ModuleReleaseMetaKind.Plural())
if err != nil {
err = client.IgnoreNotFound(err)
if err != nil {
return false, fmt.Errorf("failed to fetch ModuleReleaseMeta CRDs and update Kyma annotations: %w", err)
}
}

return kymaCrdUpdated || moduleTemplateCrdUpdated, nil
return kymaCrdUpdated || moduleTemplateCrdUpdated || moduleReleaseMetaCrdUpdated, nil
}

func PatchCRD(ctx context.Context, clnt client.Client, crd *apiextensionsv1.CustomResourceDefinition) error {
Expand Down Expand Up @@ -226,3 +238,60 @@ func cannotFoundResource(err error) bool {
}
return false
}

func crdReady(crd *apiextensionsv1.CustomResourceDefinition) bool {
for _, cond := range crd.Status.Conditions {
if cond.Type == apiextensionsv1.Established &&
cond.Status == apiextensionsv1.ConditionTrue {
return true
}

if cond.Type == apiextensionsv1.NamesAccepted &&
cond.Status == apiextensionsv1.ConditionFalse {
// This indicates a naming conflict, but it's probably not the
// job of this function to fail because of that. Instead,
// we treat it as a success, since the process should be able to
// continue.
return true
}
}
return false
}

func containsCRDNotFoundError(errs []error) bool {
for _, err := range errs {
unwrappedError := errors.Unwrap(err)
if meta.IsNoMatchError(unwrappedError) || CRDNotFoundErr(unwrappedError) {
return true
}
}
return false
}

func createCRDInRuntime(ctx context.Context, crdKind shared.Kind, crdNotReadyErr error, kcpClient client.Client, skrClient client.Client) error {
kcpCrd := &apiextensionsv1.CustomResourceDefinition{}
skrCrd := &apiextensionsv1.CustomResourceDefinition{}
objKey := client.ObjectKey{
Name: fmt.Sprintf("%s.%s", crdKind.Plural(), v1beta2.GroupVersion.Group),
}
err := kcpClient.Get(ctx, objKey, kcpCrd)
if err != nil {
return fmt.Errorf("failed to get %s CRD from KCP: %w", string(crdKind), err)
}

err = skrClient.Get(ctx, objKey, skrCrd)

if util.IsNotFound(err) || !ContainsLatestVersion(skrCrd, v1beta2.GroupVersion.Version) {
return PatchCRD(ctx, skrClient, kcpCrd)
}

if !crdReady(skrCrd) {
return crdNotReadyErr
}

if err != nil {
return fmt.Errorf("failed to get %s CRD from SKR: %w", string(crdKind), err)
}

return nil
}
113 changes: 113 additions & 0 deletions internal/remote/modulereleasemeta_syncer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package remote

import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/api/meta"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kyma-project/lifecycle-manager/api/v1beta2"
"github.com/kyma-project/lifecycle-manager/internal/util/collections"
"github.com/kyma-project/lifecycle-manager/pkg/util"
)

// moduleReleaseMetaSyncWorker is an interface for worker synchronizing ModuleReleaseMetas from KCP to SKR.
type moduleReleaseMetaSyncWorker interface {
SyncConcurrently(ctx context.Context, kcpModules []v1beta2.ModuleReleaseMeta) error
DeleteConcurrently(ctx context.Context, runtimeModules []v1beta2.ModuleReleaseMeta) error
}

// moduleReleaseMetaSyncWorkerFactory is a factory function for creating new moduleReleaseMetaSyncWorker instance.
type moduleReleaseMetaSyncWorkerFactory func(kcpClient, skrClient client.Client, settings *Settings) moduleReleaseMetaSyncWorker

// moduleReleaseMetaSyncer provides a top-level API for synchronizing ModuleReleaseMetas from KCP to SKR.
// It expects a ready-to-use client to the KCP and SKR cluster.
type moduleReleaseMetaSyncer struct {
kcpClient client.Client
skrClient client.Client
settings *Settings
syncWorkerFactoryFn moduleReleaseMetaSyncWorkerFactory
}

func newModuleReleaseMetaSyncer(kcpClient, skrClient client.Client, settings *Settings) *moduleReleaseMetaSyncer {
var syncWokerFactoryFn moduleReleaseMetaSyncWorkerFactory = func(kcpClient, skrClient client.Client, settings *Settings) moduleReleaseMetaSyncWorker {
return newModuleReleaseMetaConcurrentWorker(kcpClient, skrClient, settings)
}

return &moduleReleaseMetaSyncer{
kcpClient: kcpClient,
skrClient: skrClient,
settings: settings,
syncWorkerFactoryFn: syncWokerFactoryFn,
}
}

// SyncToSKR first lists all currently available ModuleReleaseMetas in the Runtime.
// If there is a NoMatchError, it will attempt to install the CRD but only if there are available crs to copy.
// It will use a 2 stage process:
// 1. All ModuleReleaseMeta that have to be created based on the ModuleReleaseMetas existing in the Control Plane.
// 2. All ModuleReleaseMeta that have to be removed as they are not existing in the Control Plane.
// It uses Server-Side-Apply Patches to optimize the turnaround required.
func (mts *moduleReleaseMetaSyncer) SyncToSKR(ctx context.Context, kcpModuleReleases []v1beta2.ModuleReleaseMeta) error {
worker := mts.syncWorkerFactoryFn(mts.kcpClient, mts.skrClient, mts.settings)

if err := worker.SyncConcurrently(ctx, kcpModuleReleases); err != nil {
return err
}

runtimeModuleReleases := &v1beta2.ModuleReleaseMetaList{}
if err := mts.skrClient.List(ctx, runtimeModuleReleases); err != nil {
// it can happen that the ModuleReleaseMeta CRD is not caught during to apply if there are no objects to apply
// if this is the case and there is no CRD there can never be any ModuleReleaseMetas to delete
if meta.IsNoMatchError(err) {
return nil
}
return fmt.Errorf("failed to list ModuleReleaseMetas from runtime: %w", err)
}

diffsToDelete := moduleReleaseMetasDiffFor(runtimeModuleReleases.Items).NotExistingIn(kcpModuleReleases)
diffsToDelete = collections.FilterInPlace(diffsToDelete, isModuleReleaseMetaManagedByKcp)
return worker.DeleteConcurrently(ctx, collections.Dereference(diffsToDelete))
}

// DeleteAllManaged deletes all ModuleReleaseMetas managed by KLM from the SKR cluster.
func (mts *moduleReleaseMetaSyncer) DeleteAllManaged(ctx context.Context) error {
moduleReleaseMetasRuntime := &v1beta2.ModuleReleaseMetaList{Items: []v1beta2.ModuleReleaseMeta{}}
if err := mts.skrClient.List(ctx, moduleReleaseMetasRuntime); err != nil {
// if there is no CRD or no ModuleReleaseMeta exists,
// there can never be any ModuleReleaseMeta to delete
if util.IsNotFound(err) {
return nil
}
return fmt.Errorf("failed to list ModuleReleaseMeta from skr: %w", err)
}
for i := range moduleReleaseMetasRuntime.Items {
if isModuleReleaseMetaManagedByKcp(&moduleReleaseMetasRuntime.Items[i]) {
if err := mts.skrClient.Delete(ctx, &moduleReleaseMetasRuntime.Items[i]); err != nil &&
!util.IsNotFound(err) {
return fmt.Errorf("failed to delete ModuleReleaseMeta from skr: %w", err)
}
}
}
return nil
}

// moduleReleaseMetasDiffFor returns a diffCalc for ModuleReleaseMeta objects.
func moduleReleaseMetasDiffFor(first []v1beta2.ModuleReleaseMeta) *collections.DiffCalc[v1beta2.ModuleReleaseMeta] {
return &collections.DiffCalc[v1beta2.ModuleReleaseMeta]{
First: first,
Identity: func(obj v1beta2.ModuleReleaseMeta) string {
return obj.Namespace + obj.Name
},
}
}

func isModuleReleaseMetaManagedByKcp(skrObject *v1beta2.ModuleReleaseMeta) bool {
for _, managedFieldEntry := range skrObject.ObjectMeta.ManagedFields {
if managedFieldEntry.Manager == moduleCatalogSyncFieldManager {
return true
}
}
return false
}
Loading
Loading