Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Modulereleasemeta catalog sync #2014

Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions internal/controller/kyma/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,9 +519,15 @@ func (r *Reconciler) syncModuleCatalog(ctx context.Context, kyma *v1beta2.Kyma)
modulesToSync = append(modulesToSync, mt)
}
}

moduleReleaseMetaList := &v1beta2.ModuleReleaseMetaList{}
if err := r.List(ctx, moduleReleaseMetaList, &client.ListOptions{}); err != nil {
return fmt.Errorf("could not aggregate module release metas for module catalog sync: %w", err)
}

remoteCatalog := remote.NewRemoteCatalogFromKyma(r.Client, r.SkrContextFactory, r.RemoteSyncNamespace)
if err := remoteCatalog.Sync(ctx, kyma.GetNamespacedName(), modulesToSync); err != nil {
return fmt.Errorf("could not synchronize remote module catalog: %w", err)
if err := remoteCatalog.Sync(ctx, kyma.GetNamespacedName(), modulesToSync, moduleReleaseMetaList.Items); err != nil {
return err
}

return nil
Expand Down
37 changes: 37 additions & 0 deletions internal/remote/crd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package remote

import (
"errors"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/meta"
)

func crdReady(crd *apiextensionsv1.CustomResourceDefinition) bool {
nesmabadr marked this conversation as resolved.
Show resolved Hide resolved
for _, cond := range crd.Status.Conditions {
if cond.Type == apiextensionsv1.Established &&
cond.Status == apiextensionsv1.ConditionTrue {
return true
}

if cond.Type == apiextensionsv1.NamesAccepted &&
cond.Status == apiextensionsv1.ConditionFalse {
// This indicates a naming conflict, but it's probably not the
// job of this function to fail because of that. Instead,
// we treat it as a success, since the process should be able to
// continue.
return true
}
}
return false
}

func containsCRDNotFoundError(errs []error) bool {
for _, err := range errs {
unwrappedError := errors.Unwrap(err)
if meta.IsNoMatchError(unwrappedError) || CRDNotFoundErr(unwrappedError) {
return true
}
}
return false
}
114 changes: 114 additions & 0 deletions internal/remote/modulereleasemeta_syncer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package remote

import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kyma-project/lifecycle-manager/api/v1beta2"
"github.com/kyma-project/lifecycle-manager/internal/util/collections"
"github.com/kyma-project/lifecycle-manager/pkg/util"
)

// moduleReleaseMetaSyncWorker is an interface for worker synchronizing ModuleReleaseMetas from KCP to SKR.
type moduleReleaseMetaSyncWorker interface {
SyncConcurrently(ctx context.Context, kcpModules []v1beta2.ModuleReleaseMeta) error
DeleteConcurrently(ctx context.Context, runtimeModules []v1beta2.ModuleReleaseMeta) error
}

// moduleReleaseMetaSyncWorkerFactory is a factory function for creating new moduleReleaseMetaSyncWorker instance.
type moduleReleaseMetaSyncWorkerFactory func(kcpClient, skrClient client.Client, settings *Settings) moduleReleaseMetaSyncWorker

// moduleReleaseMetaSyncer provides a top-level API for synchronizing ModuleReleaseMetas from KCP to SKR.
// It expects a ready-to-use client to the KCP and SKR cluster.
type moduleReleaseMetaSyncer struct {
kcpClient client.Client
skrClient client.Client
settings *Settings
syncWorkerFactoryFn moduleReleaseMetaSyncWorkerFactory
}

func newModuleReleaseMetaSyncer(kcpClient, skrClient client.Client, settings *Settings) *moduleReleaseMetaSyncer {
var syncWokerFactoryFn moduleReleaseMetaSyncWorkerFactory = func(kcpClient, skrClient client.Client, settings *Settings) moduleReleaseMetaSyncWorker {
return newModuleReleaseMetaConcurrentWorker(kcpClient, skrClient, settings)
}

return &moduleReleaseMetaSyncer{
kcpClient: kcpClient,
skrClient: skrClient,
settings: settings,
syncWorkerFactoryFn: syncWokerFactoryFn,
}
}

// SyncToSKR first lists all currently available ModuleReleaseMetas in the Runtime.
// If there is a NoMatchError, it will attempt to install the CRD but only if there are available crs to copy.
// It will use a 2 stage process:
// 1. All ModuleReleaseMeta that have to be created based on the ModuleReleaseMetas existing in the Control Plane.
// 2. All ModuleReleaseMeta that have to be removed as they are not existing in the Control Plane.
// It uses Server-Side-Apply Patches to optimize the turnaround required.
func (mts *moduleReleaseMetaSyncer) SyncToSKR(ctx context.Context, kyma types.NamespacedName, kcpModuleReleases []v1beta2.ModuleReleaseMeta) error {
nesmabadr marked this conversation as resolved.
Show resolved Hide resolved
worker := mts.syncWorkerFactoryFn(mts.kcpClient, mts.skrClient, mts.settings)

if err := worker.SyncConcurrently(ctx, kcpModuleReleases); err != nil {
return err
}

runtimeModuleReleases := &v1beta2.ModuleReleaseMetaList{}
if err := mts.skrClient.List(ctx, runtimeModuleReleases); err != nil {
// it can happen that the ModuleReleaseMeta CRD is not caught during to apply if there are no objects to apply
// if this is the case and there is no CRD there can never be any ModuleReleaseMetas to delete
if meta.IsNoMatchError(err) {
return nil
}
return fmt.Errorf("failed to list ModuleReleaseMetas from runtime: %w", err)
}

diffsToDelete := moduleReleaseMetasDiffFor(runtimeModuleReleases.Items).NotExistingIn(kcpModuleReleases)
diffsToDelete = collections.FilterInPlace(diffsToDelete, isModuleReleaseMetaManagedByKcp)
return worker.DeleteConcurrently(ctx, collections.Dereference(diffsToDelete))
}

// DeleteAllManaged deletes all ModuleReleaseMetas managed by KLM from the SKR cluster.
func (mts *moduleReleaseMetaSyncer) DeleteAllManaged(ctx context.Context, kyma types.NamespacedName) error {
moduleReleaseMetasRuntime := &v1beta2.ModuleReleaseMetaList{Items: []v1beta2.ModuleReleaseMeta{}}
if err := mts.skrClient.List(ctx, moduleReleaseMetasRuntime); err != nil {
// if there is no CRD or no ModuleReleaseMeta exists,
// there can never be any ModuleReleaseMeta to delete
if util.IsNotFound(err) {
return nil
}
return fmt.Errorf("failed to list ModuleReleaseMeta from skr: %w", err)
}
for i := range moduleReleaseMetasRuntime.Items {
if isModuleReleaseMetaManagedByKcp(&moduleReleaseMetasRuntime.Items[i]) {
if err := mts.skrClient.Delete(ctx, &moduleReleaseMetasRuntime.Items[i]); err != nil &&
!util.IsNotFound(err) {
return fmt.Errorf("failed to delete ModuleReleaseMeta from skr: %w", err)
}
}
}
return nil
}

// moduleReleaseMetasDiffFor returns a diffCalc for ModuleReleaseMeta objects.
func moduleReleaseMetasDiffFor(first []v1beta2.ModuleReleaseMeta) *collections.DiffCalc[v1beta2.ModuleReleaseMeta] {
return &collections.DiffCalc[v1beta2.ModuleReleaseMeta]{
First: first,
Identity: func(obj v1beta2.ModuleReleaseMeta) string {
return obj.Namespace + obj.Name
},
}
}

func isModuleReleaseMetaManagedByKcp(skrObject *v1beta2.ModuleReleaseMeta) bool {
for _, managedFieldEntry := range skrObject.ObjectMeta.ManagedFields {
if managedFieldEntry.Manager == moduleCatalogSyncFieldManager {
return true
}
}
return false
}
181 changes: 181 additions & 0 deletions internal/remote/modulereleasemeta_syncer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
//nolint:testpackage // this file tests unexported types of the package
package remote

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
apimetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/kyma-project/lifecycle-manager/api/v1beta2"
)

// TestModuleReleaseMetaSyncer_SyncToSKR_happypath tests the happy path of the SyncToSKR method,
// with some ModuleReleseMetas to be installed in the SKR and some objects to be deleted from the SKR.
func TestModuleReleaseMetaSyncer_SyncToSKR_happypath(t *testing.T) { //nolint:dupl // duplication will be removed: https://github.com/kyma-project/lifecycle-manager/issues/2015
// given
mrmKCP1 := moduleReleaseMeta("mrm1", "kcp-system") // this one should be installed in the SKR, because it's not there
mrmKCP2 := moduleReleaseMeta("mrm2", "kcp-system")
mrmKCP3 := moduleReleaseMeta("mrm3", "kcp-system")

mrmSKR2 := moduleReleaseMeta("mrm2", "kyma-system")
mrmSKR3 := moduleReleaseMeta("mrm3", "kyma-system")
mrmSKR4 := moduleReleaseMeta("mrm4", "kyma-system") // this one should be deleted, because it's not in the KCP

// Create a fake client with the SKR objects
scheme, err := v1beta2.SchemeBuilder.Build()
require.NoError(t, err)
skrClient := fake.NewClientBuilder().
WithObjects(&mrmSKR2, &mrmSKR3, &mrmSKR4).
WithScheme(scheme).
Build()

// onSyncConcurrentlyFn "pretends" to be the moduleReleaseMetaConcurrentWorker.SyncConcurrently
nesmabadr marked this conversation as resolved.
Show resolved Hide resolved
onSyncConcurrentlyFn := func(_ context.Context, kcpModules []v1beta2.ModuleReleaseMeta) {
if len(kcpModules) != 3 {
t.Errorf("Expected 3 kcp modules, got %d", len(kcpModules))
}
if kcpModules[0].Name != "mrm1" {
t.Errorf("Expected module mrm1, got %s", kcpModules[0].Name)
}
if kcpModules[1].Name != "mrm2" {
t.Errorf("Expected module mrm2, got %s", kcpModules[1].Name)
}
if kcpModules[2].Name != "mrm3" {
t.Errorf("Expected module mrm3, got %s", kcpModules[2].Name)
}
}

// onDeleteConcurrentlyFn "pretends" to be the moduleReleaseMetaConcurrentWorker.DeleteConcurrently
onDeleteConcurrentlyFn := func(_ context.Context, runtimeModules []v1beta2.ModuleReleaseMeta) {
if len(runtimeModules) != 1 {
t.Errorf("Expected 1 runtime module, got %d", len(runtimeModules))
}
if runtimeModules[0].Name != "mrm4" {
t.Errorf("Expected module mrm4, got %s", runtimeModules[0].Name)
}
}

syncWokerFactoryFn := func(kcpClient, skrClient client.Client, settings *Settings) moduleReleaseMetaSyncWorker {
return &fakeModuleReleaseMetaSyncWorker{
namespace: settings.Namespace,
onSyncConcurrently: onSyncConcurrentlyFn,
onDeleteConcurrently: onDeleteConcurrentlyFn,
}
}

subject := moduleReleaseMetaSyncer{
skrClient: skrClient,
settings: getSettings(),
syncWorkerFactoryFn: syncWokerFactoryFn,
}

// when
err = subject.SyncToSKR(context.Background(), types.NamespacedName{Name: "kyma", Namespace: "kcp-system"}, []v1beta2.ModuleReleaseMeta{mrmKCP1, mrmKCP2, mrmKCP3})

// then
assert.NoError(t, err)
}

// TestSyncer_SyncToSKR_nilList tests the case when the list of KCP modules is nil.
func TestModuleReleaseMetaSyncer_SyncToSKR_nilList(t *testing.T) {
// given
mtSKR2 := moduleReleaseMeta("mrm2", "kyma-system") // should be deleted, because it's not in the KCP
mtSKR3 := moduleReleaseMeta("mrm3", "kyma-system") // should be deleted, because it's not in the KCP
mtSKR4 := moduleReleaseMeta("mrm4", "kyma-system") // should be deleted, because it's not in the KCP

// Create a fake client with the SKR modules
scheme, err := v1beta2.SchemeBuilder.Build()
require.NoError(t, err)
skrClient := fake.NewClientBuilder().
WithObjects(&mtSKR2, &mtSKR3, &mtSKR4).
WithScheme(scheme).
Build()

// onSyncConcurrentlyFn "pretends" to be the moduleReleaseMetaConcurrentWorker.SyncConcurrently
onSyncConcurrentlyFn := func(_ context.Context, kcpModules []v1beta2.ModuleReleaseMeta) {
if kcpModules != nil {
t.Errorf("Expected nil kcp modules, got %v", kcpModules)
}
}

// onDeleteConcurrentlyFn "pretends" to be the moduleReleaseMetaConcurrentWorker.DeleteConcurrently
onDeleteConcurrentlyFn := func(_ context.Context, runtimeModules []v1beta2.ModuleReleaseMeta) {
if len(runtimeModules) != 3 {
t.Errorf("Expected 3 runtime module, got %d", len(runtimeModules))
}
if runtimeModules[0].Name != "mrm2" {
t.Errorf("Expected module mt2, got %s", runtimeModules[0].Name)
}
if runtimeModules[1].Name != "mrm3" {
t.Errorf("Expected module mt2, got %s", runtimeModules[1].Name)
}
if runtimeModules[2].Name != "mrm4" {
t.Errorf("Expected module mt2, got %s", runtimeModules[2].Name)
}
}

syncWokerFactoryFn := func(kcpClient, skrClient client.Client, settings *Settings) moduleReleaseMetaSyncWorker {
return &fakeModuleReleaseMetaSyncWorker{
namespace: settings.Namespace,
onSyncConcurrently: onSyncConcurrentlyFn,
onDeleteConcurrently: onDeleteConcurrentlyFn,
}
}

subject := moduleReleaseMetaSyncer{
skrClient: skrClient,
settings: getSettings(),
syncWorkerFactoryFn: syncWokerFactoryFn,
}

// when
var nilModuleReleaseMetaList []v1beta2.ModuleReleaseMeta = nil
err = subject.SyncToSKR(context.Background(), types.NamespacedName{Name: "kyma", Namespace: "kcp-system"}, nilModuleReleaseMetaList)

// then
assert.NoError(t, err)
}

func moduleReleaseMeta(name, namespace string) v1beta2.ModuleReleaseMeta {
return v1beta2.ModuleReleaseMeta{
ObjectMeta: apimetav1.ObjectMeta{
Name: name,
Namespace: namespace,
ManagedFields: []apimetav1.ManagedFieldsEntry{
{
Manager: moduleCatalogSyncFieldManager,
},
},
},
}
}

// Implements the syncWorker interface.
type fakeModuleReleaseMetaSyncWorker struct {
namespace string
onSyncConcurrently func(ctx context.Context, kcpModules []v1beta2.ModuleReleaseMeta)
onDeleteConcurrently func(ctx context.Context, runtimeModules []v1beta2.ModuleReleaseMeta)
}

func (f *fakeModuleReleaseMetaSyncWorker) SyncConcurrently(ctx context.Context, kcpModules []v1beta2.ModuleReleaseMeta) error {
f.onSyncConcurrently(ctx, kcpModules)

// Simulate namespace switch on modules in kcpModules list that happens in moduleReleaseMetaConcurrentWorker.SyncConcurrently
// This is necessary for proper diff calculation later in the process.
for i := range kcpModules {
prepareModuleReleaseMetaForSSA(&kcpModules[i], f.namespace)
}

return nil
}

func (f *fakeModuleReleaseMetaSyncWorker) DeleteConcurrently(ctx context.Context, runtimeModules []v1beta2.ModuleReleaseMeta) error {
f.onDeleteConcurrently(ctx, runtimeModules)
return nil
}
Loading
Loading