From b8eccd4ace0cf29073db45e2fa2019f6d3aadca5 Mon Sep 17 00:00:00 2001 From: Or Ozeri Date: Tue, 16 Apr 2024 09:35:43 +0300 Subject: [PATCH] v1alpha1.Import: Support merging with an existing service This commit adds a new option for Imports to be merged with an existing K8s service. This works by adding an endpoint slices to the exsting service, which points to the clusterlink dataplane service. Signed-off-by: Or Ozeri --- cmd/cl-controlplane/app/server.go | 41 +- cmd/gwctl/subcommand/import.go | 10 +- config/operator/rbac/role.yaml | 12 + pkg/apis/clusterlink.net/v1alpha1/import.go | 6 +- pkg/bootstrap/platform/k8s.go | 3 + pkg/controlplane/control/controllers.go | 107 ++-- pkg/controlplane/control/manager.go | 506 +++++++++++++++--- pkg/controlplane/control/port.go | 5 + pkg/controlplane/rest/import.go | 29 +- pkg/controlplane/store/types.go | 3 + .../controller/instance_controller.go | 8 + tests/e2e/k8s/services/httpecho/client.go | 10 + tests/e2e/k8s/test_import.go | 157 +++++- tests/e2e/k8s/util/clusterlink.go | 14 +- tests/e2e/k8s/util/fabric.go | 13 + tests/e2e/k8s/util/kind.go | 10 + 16 files changed, 777 insertions(+), 157 deletions(-) diff --git a/cmd/cl-controlplane/app/server.go b/cmd/cl-controlplane/app/server.go index 37ff359d..f4f63832 100644 --- a/cmd/cl-controlplane/app/server.go +++ b/cmd/cl-controlplane/app/server.go @@ -23,6 +23,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/pflag" v1 "k8s.io/api/core/v1" + discv1 "k8s.io/api/discovery/v1" "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -150,25 +151,29 @@ func (o *Options) Run() error { return fmt.Errorf("unable to add core v1 objects to scheme: %w", err) } + if err := discv1.AddToScheme(scheme); err != nil { + return fmt.Errorf("unable to add discovery v1 objects to scheme: %w", err) + } + // set logger for controller-runtime components ctrl.SetLogger(logrusr.New(logrus.WithField("component", "k8s.controller-runtime"))) - // limit watch for v1alpha1.Peer to the namespace given by 'namespace' - managerOptions := manager.Options{} + managerOptions := manager.Options{ + Cache: cache.Options{ + ByObject: make(map[client.Object]cache.ByObject), + }, + Scheme: scheme, + } + + // limit watch for v1alpha1.Peer and EndpointSlice to the namespace given by 'namespace' if o.CRDMode { - managerOptions = manager.Options{ - Cache: cache.Options{ - ByObject: map[client.Object]cache.ByObject{ - &v1alpha1.Peer{}: { - Namespaces: map[string]cache.Config{ - namespace: {}, - }, - }, - }, + managerOptions.Cache.ByObject[&v1alpha1.Peer{}] = cache.ByObject{ + Namespaces: map[string]cache.Config{ + namespace: {}, }, - Scheme: scheme, } } + mgr, err := manager.New(config, managerOptions) if err != nil { return fmt.Errorf( @@ -198,6 +203,11 @@ func (o *Options) Run() error { controlManager := control.NewManager(mgr.GetClient(), parsedCertData, namespace, o.CRDMode) + err = control.CreateControllers(controlManager, mgr, o.CRDMode) + if err != nil { + return fmt.Errorf("cannot create control controllers: %w", err) + } + xdsManager := xds.NewManager(o.CRDMode) xds.RegisterService( context.Background(), xdsManager, grpcServer.GetGRPCServer()) @@ -207,11 +217,6 @@ func (o *Options) Run() error { if err != nil { return fmt.Errorf("cannot create xDS controllers: %w", err) } - - err = control.CreateControllers(controlManager, mgr) - if err != nil { - return fmt.Errorf("cannot create control controllers: %w", err) - } } else { // open store kvStore, err := bolt.Open(StoreFile) @@ -235,6 +240,8 @@ func (o *Options) Run() error { cprest.RegisterHandlers(restManager, httpServer) + controlManager.SetGetMergeImportListCallback(restManager.GetMergeImportList) + controlManager.SetGetImportCallback(restManager.GetK8sImport) controlManager.SetStatusCallback(func(pr *v1alpha1.Peer) { authzManager.AddPeer(pr) }) diff --git a/cmd/gwctl/subcommand/import.go b/cmd/gwctl/subcommand/import.go index 0f9fcfe9..1ccfd634 100644 --- a/cmd/gwctl/subcommand/import.go +++ b/cmd/gwctl/subcommand/import.go @@ -32,6 +32,7 @@ type importOptions struct { name string port uint16 peers []string + merge bool } // ImportCreateCmd - create an imported service. @@ -76,6 +77,7 @@ func (o *importOptions) addFlags(fs *pflag.FlagSet) { fs.StringVar(&o.name, "name", "", "Imported service name") fs.Uint16Var(&o.port, "port", 0, "Imported service port") fs.StringSliceVar(&o.peers, "peer", []string{}, "Remote peer to import the service from") + fs.BoolVar(&o.merge, "merge", false, "Merge with an existing service endpoint") } // run performs the execution of the 'create import' or 'update import' subcommand. @@ -96,9 +98,15 @@ func (o *importOptions) run(isUpdate bool) error { sources[i].ExportName = o.name } + labels := make(map[string]string) + if o.merge { + labels[v1alpha1.LabelImportMerge] = "true" + } + err = importOperation(&v1alpha1.Import{ ObjectMeta: metav1.ObjectMeta{ - Name: o.name, + Name: o.name, + Labels: labels, }, Spec: v1alpha1.ImportSpec{ Port: o.port, diff --git a/config/operator/rbac/role.yaml b/config/operator/rbac/role.yaml index 90f6e6c9..0d269ec5 100644 --- a/config/operator/rbac/role.yaml +++ b/config/operator/rbac/role.yaml @@ -97,6 +97,18 @@ rules: - get - patch - update +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - rbac.authorization.k8s.io resources: diff --git a/pkg/apis/clusterlink.net/v1alpha1/import.go b/pkg/apis/clusterlink.net/v1alpha1/import.go index bdb9f642..d9b140d9 100644 --- a/pkg/apis/clusterlink.net/v1alpha1/import.go +++ b/pkg/apis/clusterlink.net/v1alpha1/import.go @@ -60,8 +60,10 @@ type ImportSpec struct { const ( // ImportTargetPortValid is a condition type for indicating whether the import target port is valid. ImportTargetPortValid string = "ImportTargetPortValid" - // ImportServiceCreated is a condition type for indicating whether the import service was successfully created. - ImportServiceCreated string = "ImportServiceCreated" + // ImportServiceValid is a condition type for indicating whether the import service exists and valid. + ImportServiceValid string = "ImportServiceValid" + + LabelImportMerge string = "import.clusterlink.net/merge" ) // ImportStatus represents the status of an imported service. diff --git a/pkg/bootstrap/platform/k8s.go b/pkg/bootstrap/platform/k8s.go index 4bd68fea..7bb59dcd 100644 --- a/pkg/bootstrap/platform/k8s.go +++ b/pkg/bootstrap/platform/k8s.go @@ -274,6 +274,9 @@ rules: - apiGroups: [""] resources: ["services"] verbs: ["get", "list", "watch", "create", "delete", "update"] +- apiGroups: ["discovery.k8s.io"] + resources: ["endpointslices"] + verbs: ["get", "list", "watch", "create", "delete", "update"] - apiGroups: [""] resources: ["pods"] verbs: ["get", "list", "watch"] diff --git a/pkg/controlplane/control/controllers.go b/pkg/controlplane/control/controllers.go index 93364634..dc1ae37c 100644 --- a/pkg/controlplane/control/controllers.go +++ b/pkg/controlplane/control/controllers.go @@ -18,6 +18,8 @@ import ( "github.com/clusterlink-net/clusterlink/pkg/util/controller" v1 "k8s.io/api/core/v1" + discv1 "k8s.io/api/discovery/v1" + "k8s.io/apimachinery/pkg/types" "github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1" @@ -25,57 +27,72 @@ import ( ) // CreateControllers creates the various k8s controllers used to update the control manager. -func CreateControllers(mgr *Manager, controllerManager ctrl.Manager) error { - err := controller.AddToManager(controllerManager, &controller.Spec{ - Name: "control.peer", - Object: &v1alpha1.Peer{}, - AddHandler: func(ctx context.Context, object any) error { - mgr.AddPeer(object.(*v1alpha1.Peer)) - return nil - }, - DeleteHandler: func(ctx context.Context, name types.NamespacedName) error { - mgr.DeletePeer(name.Name) - return nil - }, - }) - if err != nil { - return err - } +func CreateControllers(mgr *Manager, controllerManager ctrl.Manager, crdMode bool) error { + if crdMode { + err := controller.AddToManager(controllerManager, &controller.Spec{ + Name: "control.peer", + Object: &v1alpha1.Peer{}, + AddHandler: func(ctx context.Context, object any) error { + mgr.AddPeer(object.(*v1alpha1.Peer)) + return nil + }, + DeleteHandler: func(ctx context.Context, name types.NamespacedName) error { + mgr.DeletePeer(name.Name) + return nil + }, + }) + if err != nil { + return err + } + err = controller.AddToManager(controllerManager, &controller.Spec{ + Name: "control.service", + Object: &v1.Service{}, + AddHandler: func(ctx context.Context, object any) error { + return mgr.addService(ctx, object.(*v1.Service)) + }, + DeleteHandler: func(ctx context.Context, name types.NamespacedName) error { + return mgr.deleteService(ctx, name) + }, + }) + if err != nil { + return err + } - err = controller.AddToManager(controllerManager, &controller.Spec{ - Name: "control.service", - Object: &v1.Service{}, - AddHandler: func(ctx context.Context, object any) error { - return mgr.addService(ctx, object.(*v1.Service)) - }, - DeleteHandler: func(ctx context.Context, name types.NamespacedName) error { - return mgr.deleteService(ctx, name) - }, - }) - if err != nil { - return err - } + err = controller.AddToManager(controllerManager, &controller.Spec{ + Name: "control.export", + Object: &v1alpha1.Export{}, + AddHandler: func(ctx context.Context, object any) error { + return mgr.addExport(ctx, object.(*v1alpha1.Export)) + }, + DeleteHandler: func(ctx context.Context, name types.NamespacedName) error { + return nil + }, + }) + if err != nil { + return err + } - err = controller.AddToManager(controllerManager, &controller.Spec{ - Name: "control.export", - Object: &v1alpha1.Export{}, - AddHandler: func(ctx context.Context, object any) error { - return mgr.addExport(ctx, object.(*v1alpha1.Export)) - }, - DeleteHandler: func(ctx context.Context, name types.NamespacedName) error { - return nil - }, - }) - if err != nil { - return err + err = controller.AddToManager(controllerManager, &controller.Spec{ + Name: "control.import", + Object: &v1alpha1.Import{}, + AddHandler: func(ctx context.Context, object any) error { + return mgr.AddImport(ctx, object.(*v1alpha1.Import)) + }, + DeleteHandler: mgr.DeleteImport, + }) + if err != nil { + return err + } } return controller.AddToManager(controllerManager, &controller.Spec{ - Name: "control.import", - Object: &v1alpha1.Import{}, + Name: "control.endpointslice", + Object: &discv1.EndpointSlice{}, AddHandler: func(ctx context.Context, object any) error { - return mgr.AddImport(ctx, object.(*v1alpha1.Import)) + return mgr.addEndpointSlice(ctx, object.(*discv1.EndpointSlice)) + }, + DeleteHandler: func(ctx context.Context, name types.NamespacedName) error { + return mgr.deleteEndpointSlice(ctx, name) }, - DeleteHandler: mgr.DeleteImport, }) } diff --git a/pkg/controlplane/control/manager.go b/pkg/controlplane/control/manager.go index c57650ea..859cb194 100644 --- a/pkg/controlplane/control/manager.go +++ b/pkg/controlplane/control/manager.go @@ -15,6 +15,9 @@ package control import ( "context" + "reflect" + "strconv" + "strings" //nolint:gosec // G505: use of weak cryptographic primitive is fine for service name "crypto/md5" @@ -24,12 +27,13 @@ import ( "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" + discv1 "k8s.io/api/discovery/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/utils/strings" + k8sstrings "k8s.io/utils/strings" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -39,10 +43,15 @@ import ( ) const ( - AppName = "clusterlink.net" + AppName = "clusterlink.net" + + // service labels. LabelManagedBy = "app.kubernetes.io/managed-by" LabelImportName = "clusterlink.net/import-name" LabelImportNamespace = "clusterlink.net/import-namespace" + + // endpoint slice labels. + LabelDPEndpointSliceName = "clusterlink.net/dataplane-endpointslice-name" ) type exportServiceNotExistError struct { @@ -51,10 +60,30 @@ type exportServiceNotExistError struct { func (e exportServiceNotExistError) Error() string { return fmt.Sprintf( - "service '%s/%s' does not exist", + "export service '%s/%s' does not exist", e.name.Namespace, e.name.Name) } +func (e exportServiceNotExistError) Is(target error) bool { + _, ok := target.(*exportServiceNotExistError) + return ok +} + +type importServiceNotExistError struct { + name types.NamespacedName +} + +func (e importServiceNotExistError) Error() string { + return fmt.Sprintf( + "import service '%s/%s' does not exist", + e.name.Namespace, e.name.Name) +} + +func (e importServiceNotExistError) Is(target error) bool { + _, ok := target.(*importServiceNotExistError) + return ok +} + type conflictingServiceError struct { name types.NamespacedName managedBy string @@ -66,6 +95,39 @@ func (e conflictingServiceError) Error() string { e.name.Namespace, e.name.Name, e.managedBy) } +func (e conflictingServiceError) Is(target error) bool { + _, ok := target.(*conflictingServiceError) + return ok +} + +type importEndpointSliceName struct { + importName string + dataplaneEndpointSliceName string +} + +func (n *importEndpointSliceName) Get() string { + return fmt.Sprintf( + "clusterlink-%d-%s-%s", + len(n.importName), n.importName, n.dataplaneEndpointSliceName) +} + +func (n *importEndpointSliceName) Parse(importEndpointSliceName string) bool { + components := strings.SplitN(importEndpointSliceName, "-", 3) + if len(components) != 3 || components[0] != "clusterlink" { + return false + } + + importNameLength, err := strconv.Atoi(components[1]) + if err != nil || importNameLength <= 0 || importNameLength >= len(components[2]) { + return false + } + + n.importName = components[2][:importNameLength] + n.dataplaneEndpointSliceName = components[2][importNameLength+1:] + + return n.dataplaneEndpointSliceName != "" +} + // Manager is responsible for handling control operations, // which needs to be coordinated across all dataplane/controlplane instances. // This includes target port generation for imported services, as well as @@ -81,9 +143,22 @@ type Manager struct { lock sync.Mutex serviceToImport map[string]types.NamespacedName + // callback for getting all merge imports (for non-CRD mode) + getMergeImportListCallback func() *v1alpha1.ImportList + // callback for getting an import (for non-CRD mode) + getImportCallback func(name string, imp *v1alpha1.Import) error + logger *logrus.Entry } +func (m *Manager) SetGetMergeImportListCallback(callback func() *v1alpha1.ImportList) { + m.getMergeImportListCallback = callback +} + +func (m *Manager) SetGetImportCallback(callback func(name string, imp *v1alpha1.Import) error) { + m.getImportCallback = callback +} + // AddImport adds a listening socket for an imported remote service. func (m *Manager) AddImport(ctx context.Context, imp *v1alpha1.Import) (err error) { m.logger.Infof("Adding import '%s/%s'.", imp.Namespace, imp.Name) @@ -98,22 +173,22 @@ func (m *Manager) AddImport(ctx context.Context, imp *v1alpha1.Import) (err erro return } - serviceCreatedCond := &metav1.Condition{ - Type: v1alpha1.ImportServiceCreated, + serviceValidCond := &metav1.Condition{ + Type: v1alpha1.ImportServiceValid, Status: metav1.ConditionTrue, - Reason: "Created", + Reason: "Valid", } if err != nil { - serviceCreatedCond.Status = metav1.ConditionFalse - serviceCreatedCond.Reason = "Error" - serviceCreatedCond.Message = err.Error() + serviceValidCond.Status = metav1.ConditionFalse + serviceValidCond.Reason = "Error" + serviceValidCond.Message = err.Error() } conditions := &imp.Status.Conditions - if conditionChanged(conditions, serviceCreatedCond) || conditionChanged(conditions, targetPortValidCond) { + if conditionChanged(conditions, serviceValidCond) || conditionChanged(conditions, targetPortValidCond) { meta.SetStatusCondition(conditions, *targetPortValidCond) - meta.SetStatusCondition(conditions, *serviceCreatedCond) + meta.SetStatusCondition(conditions, *serviceValidCond) m.logger.Infof("Updating import '%s/%s' status: %v.", imp.Namespace, imp.Name, *conditions) statusError := m.client.Status().Update(ctx, imp) @@ -128,7 +203,9 @@ func (m *Manager) AddImport(ctx context.Context, imp *v1alpha1.Import) (err erro } } - if errors.Is(err, &conflictingServiceError{}) || errors.Is(err, &conflictingTargetPortError{}) { + if errors.Is(err, &conflictingServiceError{}) || + errors.Is(err, &conflictingTargetPortError{}) || + errors.Is(err, &importServiceNotExistError{}) { err = reconcile.TerminalError(err) } }() @@ -143,12 +220,23 @@ func (m *Manager) AddImport(ctx context.Context, imp *v1alpha1.Import) (err erro targetPortValidCond.Status = metav1.ConditionTrue targetPortValidCond.Reason = "Leased" + if imp.Labels[v1alpha1.LabelImportMerge] == "true" { + return m.addImportEndpointSlices(ctx, imp) + } + + importName := types.NamespacedName{ + Namespace: imp.Namespace, + Name: imp.Name, + } + + // delete import endpoint slices, in case the import was previously a "merge: true" import + if err := m.deleteImportEndpointSlices(ctx, importName); err != nil { + return err + } + serviceName := imp.Name if imp.Namespace != m.namespace { - serviceName = SystemServiceName(types.NamespacedName{ - Namespace: imp.Namespace, - Name: imp.Name, - }) + serviceName = SystemServiceName(importName) } systemService := &v1.Service{ @@ -198,7 +286,7 @@ func (m *Manager) DeleteImport(ctx context.Context, name types.NamespacedName) e m.logger.Infof("Deleting import '%s/%s'.", name.Namespace, name.Name) // delete user service - errs := make([]error, 2) + errs := make([]error, 3) errs[0] = m.deleteImportService(ctx, name, name) if name.Namespace != m.namespace { @@ -210,22 +298,12 @@ func (m *Manager) DeleteImport(ctx context.Context, name types.NamespacedName) e errs[1] = m.deleteImportService(ctx, systemService, name) } - m.ports.Release(name) + // delete import endpoint slices + errs[2] = m.deleteImportEndpointSlices(ctx, name) - err := errors.Join(errs...) - if err != nil && m.crdMode { - // if all errors are conflictingServiceError, mark as TerminalError - // so that reconciler will not retry - for _, err2 := range errs { - if err2 != nil && !errors.Is(err2, &conflictingServiceError{}) { - return err - } - } - - err = reconcile.TerminalError(err) - } + m.ports.Release(name) - return err + return errors.Join(errs...) } // addExport defines a new route target for ingress dataplane connections. @@ -313,7 +391,73 @@ func (m *Manager) deleteService(ctx context.Context, name types.NamespacedName) return m.checkExportService(ctx, name) } +// addEndpointSlice adds a dataplane / import endpoint slices. +func (m *Manager) addEndpointSlice(ctx context.Context, endpointSlice *discv1.EndpointSlice) error { + if endpointSlice.Labels[discv1.LabelServiceName] == dpapp.Name && endpointSlice.Namespace == m.namespace { + m.logger.Infof("Adding a dataplane endpoint slice: %s", endpointSlice.Name) + + mergeImportList, err := m.getMergeImportList(ctx) + if err != nil { + return err + } + + mergeImports := &mergeImportList.Items + for i := range *mergeImports { + err := m.checkImportEndpointSlice(ctx, &(*mergeImports)[i], endpointSlice) + if err != nil { + return err + } + } + + return nil + } + + return m.checkEndpointSlice(ctx, endpointSlice.Namespace, endpointSlice.Name) +} + +// deleteEndpointSlice is used to track deleted dataplane endpoint slices. +func (m *Manager) deleteEndpointSlice(ctx context.Context, name types.NamespacedName) error { + if err := m.checkEndpointSlice(ctx, name.Namespace, name.Name); err != nil { + return err + } + + if name.Namespace != m.namespace { + // not a dataplane endpoint slice + return nil + } + + importsEndpointSliceList := discv1.EndpointSliceList{} + err := m.client.List( + ctx, + &importsEndpointSliceList, + client.MatchingLabels{ + discv1.LabelManagedBy: AppName, + LabelDPEndpointSliceName: name.Name, + }) + if err != nil { + return err + } + + importsEndpointSlices := &importsEndpointSliceList.Items + for i := range *importsEndpointSlices { + importEndpointSlice := &(*importsEndpointSlices)[i] + m.logger.Infof( + "Deleting import endpoint slice: %s/%s", + importEndpointSlice.Namespace, importEndpointSlice.Name) + err := m.client.Delete(ctx, importEndpointSlice) + if err != nil && !k8serrors.IsNotFound(err) { + return err + } + } + + return nil +} + func (m *Manager) checkExportService(ctx context.Context, name types.NamespacedName) error { + if !m.crdMode { + return nil + } + var export v1alpha1.Export if err := m.client.Get(ctx, name, &export); err != nil { if !k8serrors.IsNotFound(err) { @@ -328,7 +472,7 @@ func (m *Manager) checkExportService(ctx context.Context, name types.NamespacedN func (m *Manager) checkImportService(ctx context.Context, name types.NamespacedName) error { var imp v1alpha1.Import - if err := m.client.Get(ctx, name, &imp); err != nil { + if err := m.getImport(ctx, name, &imp); err != nil { if !k8serrors.IsNotFound(err) { return err } @@ -350,7 +494,7 @@ func (m *Manager) checkImportService(ctx context.Context, name types.NamespacedN return nil } - if err := m.client.Get(ctx, name, &imp); err != nil { + if err := m.getImport(ctx, name, &imp); err != nil { if !k8serrors.IsNotFound(err) { return err } @@ -392,23 +536,24 @@ func (m *Manager) addImportService(ctx context.Context, imp *v1alpha1.Import, se service.Labels[LabelImportName] = imp.Name service.Labels[LabelImportNamespace] = imp.Namespace + importName := types.NamespacedName{ + Namespace: imp.Namespace, + Name: imp.Name, + } + if imp.Namespace != service.Namespace { m.lock.Lock() - m.serviceToImport[service.Name] = types.NamespacedName{ - Namespace: imp.Namespace, - Name: imp.Namespace, - } + m.serviceToImport[service.Name] = importName m.lock.Unlock() } + serviceName := types.NamespacedName{ + Name: service.Name, + Namespace: service.Namespace, + } + var oldService v1.Service - err := m.client.Get( - ctx, - types.NamespacedName{ - Name: service.Name, - Namespace: service.Namespace, - }, - &oldService) + err := m.client.Get(ctx, serviceName, &oldService) if err != nil { if !k8serrors.IsNotFound(err) { return err @@ -418,11 +563,11 @@ func (m *Manager) addImportService(ctx context.Context, imp *v1alpha1.Import, se return m.client.Create(ctx, service) } - if err := checkServiceLabels(&oldService, types.NamespacedName{ - Namespace: imp.Namespace, - Name: imp.Name, - }); err != nil { - return err + if !checkServiceLabels(&oldService, importName) { + return conflictingServiceError{ + name: serviceName, + managedBy: oldService.Labels[LabelManagedBy], + } } if !serviceChanged(&oldService, service) { @@ -445,8 +590,8 @@ func (m *Manager) deleteImportService(ctx context.Context, service, imp types.Na return nil } - if err := checkServiceLabels(&oldService, imp); err != nil { - return err + if !checkServiceLabels(&oldService, imp) { + return nil } m.logger.Infof("Deleting service: %v.", service) @@ -469,46 +614,235 @@ func (m *Manager) deleteImportService(ctx context.Context, service, imp types.Na return nil } -func checkServiceLabels(service *v1.Service, importName types.NamespacedName) error { - serviceName := types.NamespacedName{ - Namespace: service.Namespace, - Name: service.Name, +func (m *Manager) checkEndpointSlice(ctx context.Context, namespace, endpointSliceName string) error { + var parsed importEndpointSliceName + if !parsed.Parse(endpointSliceName) { + return nil } - var managedBy string - var ok bool - if managedBy, ok = service.Labels[LabelManagedBy]; !ok || managedBy != AppName { - return conflictingServiceError{ - name: serviceName, - managedBy: managedBy, + m.logger.Infof( + "Checking import endpoint slice %s/%s'.", + namespace, endpointSliceName) + + var imp v1alpha1.Import + var dataplaneEndpointSlice discv1.EndpointSlice + shouldDelete := false + if err := m.getImport(ctx, types.NamespacedName{ + Namespace: namespace, + Name: parsed.importName, + }, &imp); err != nil { + if !k8serrors.IsNotFound(err) { + return err } + + m.logger.Infof( + "Deleting an import endpoint slice with no corresponding import: %s", + endpointSliceName) + shouldDelete = true + } else if imp.Labels[v1alpha1.LabelImportMerge] != "true" { + m.logger.Infof( + "Deleting an import endpoint slice with no corresponding merge-type import: %s", + endpointSliceName) + + shouldDelete = true + } else if err := m.client.Get(ctx, types.NamespacedName{ + Namespace: m.namespace, + Name: parsed.dataplaneEndpointSliceName, + }, &dataplaneEndpointSlice); err != nil { + if !k8serrors.IsNotFound(err) { + return err + } + + m.logger.Infof( + "Deleting an import endpoint slice with no corresponding dataplane endpoint slice: %s", + endpointSliceName) + shouldDelete = true } - if name, ok := service.Labels[LabelImportName]; !ok || name != importName.Name { - return conflictingServiceError{ - name: serviceName, - managedBy: managedBy, + if shouldDelete { + err := m.client.Delete(ctx, &discv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: endpointSliceName, + Namespace: namespace, + }, + }) + if err != nil && !k8serrors.IsNotFound(err) { + return err } + + return nil } - if namespace, ok := service.Labels[LabelImportNamespace]; !ok || namespace != importName.Namespace { - return conflictingServiceError{ - name: serviceName, - managedBy: managedBy, + return m.checkImportEndpointSlice(ctx, &imp, &dataplaneEndpointSlice) +} + +func (m *Manager) checkImportEndpointSlice( + ctx context.Context, + imp *v1alpha1.Import, + dataplaneEndpointSlice *discv1.EndpointSlice, +) error { + m.logger.Infof( + "Checking endpoint slice %s for import %s/%s'.", + dataplaneEndpointSlice.Name, imp.Namespace, imp.Name) + + importEndpointSliceName := (&importEndpointSliceName{ + importName: imp.Name, + dataplaneEndpointSliceName: dataplaneEndpointSlice.Name, + }).Get() + protocol := v1.ProtocolTCP + port32 := int32(imp.Spec.TargetPort) + + importEndpointSlice := discv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: importEndpointSliceName, + Namespace: imp.Namespace, + Labels: map[string]string{ + discv1.LabelServiceName: imp.Name, + discv1.LabelManagedBy: AppName, + LabelDPEndpointSliceName: dataplaneEndpointSlice.Name, + }, + }, + AddressType: discv1.AddressTypeIPv4, + Endpoints: dataplaneEndpointSlice.Endpoints, + Ports: []discv1.EndpointPort{ + { + Port: &port32, + Protocol: &protocol, + }, + }, + } + + var oldImportEndpointSlice discv1.EndpointSlice + err := m.client.Get( + ctx, + types.NamespacedName{ + Name: importEndpointSliceName, + Namespace: imp.Namespace, + }, + &oldImportEndpointSlice) + if err != nil { + if !k8serrors.IsNotFound(err) { + return err + } + + m.logger.Infof("Creating import endpoint slice: %s.", importEndpointSliceName) + return m.client.Create(ctx, &importEndpointSlice) + } + + if !endpointSliceChanged(&importEndpointSlice, &oldImportEndpointSlice) { + return nil + } + + m.logger.Infof("Updating import endpoint slice: %s.", importEndpointSliceName) + return m.client.Update(ctx, &importEndpointSlice) +} + +func (m *Manager) addImportEndpointSlices(ctx context.Context, imp *v1alpha1.Import) error { + // check that import service exists + importName := types.NamespacedName{ + Namespace: imp.Namespace, + Name: imp.Name, + } + + if err := m.client.Get(ctx, importName, &v1.Service{}); err != nil { + if k8serrors.IsNotFound(err) && m.crdMode { + return &importServiceNotExistError{name: importName} + } + + return err + } + + // get dataplane endpoint slices + dataplaneEndpointSliceList := discv1.EndpointSliceList{} + err := m.client.List( + ctx, + &dataplaneEndpointSliceList, + client.MatchingLabels{discv1.LabelServiceName: dpapp.Name}, + client.InNamespace(m.namespace)) + if err != nil { + return err + } + + // copy dataplane endpoint slices to import endpoint slices + dataplaneEndpointSlices := &dataplaneEndpointSliceList.Items + for i := range *dataplaneEndpointSlices { + err := m.checkImportEndpointSlice(ctx, imp, &(*dataplaneEndpointSlices)[i]) + if err != nil { + return err } } return nil } +func (m *Manager) deleteImportEndpointSlices(ctx context.Context, imp types.NamespacedName) error { + endpointSlices := discv1.EndpointSliceList{} + labelSelector := client.MatchingLabels{ + discv1.LabelManagedBy: AppName, + discv1.LabelServiceName: imp.Name, + } + if err := m.client.List(ctx, &endpointSlices, labelSelector, client.InNamespace(imp.Namespace)); err != nil { + return err + } + + for i := range endpointSlices.Items { + endpointSlice := &endpointSlices.Items[i] + m.logger.Infof("Deleting import endpoint slice: %s", endpointSlice.Name) + err := m.client.Delete(ctx, endpointSlice) + if err != nil && !k8serrors.IsNotFound(err) { + return err + } + } + + return nil +} + +func (m *Manager) getImport(ctx context.Context, name types.NamespacedName, imp *v1alpha1.Import) error { + if m.getImportCallback != nil { + return m.getImportCallback(name.Name, imp) + } + + return m.client.Get(ctx, name, imp) +} + +func (m *Manager) getMergeImportList(ctx context.Context) (*v1alpha1.ImportList, error) { + if m.getMergeImportListCallback != nil { + return m.getMergeImportListCallback(), nil + } + + mergeImportList := v1alpha1.ImportList{} + labelSelector := client.MatchingLabels{v1alpha1.LabelImportMerge: "true"} + if err := m.client.List(ctx, &mergeImportList, labelSelector); err != nil { + return nil, err + } + + return &mergeImportList, nil +} + +func checkServiceLabels(service *v1.Service, importName types.NamespacedName) bool { + if managedBy, ok := service.Labels[LabelManagedBy]; !ok || managedBy != AppName { + return false + } + + if name, ok := service.Labels[LabelImportName]; !ok || name != importName.Name { + return false + } + + if namespace, ok := service.Labels[LabelImportNamespace]; !ok || namespace != importName.Namespace { + return false + } + + return true +} + func SystemServiceName(name types.NamespacedName) string { //nolint:gosec // G401: use of weak cryptographic primitive is fine for service name hash := md5.New() hash.Write([]byte(name.Namespace + "/" + name.Name)) return fmt.Sprintf( "import-%s-%s-%x", - strings.ShortenString(name.Name, 10), - strings.ShortenString(name.Namespace, 10), + k8sstrings.ShortenString(name.Name, 10), + k8sstrings.ShortenString(name.Namespace, 10), hash.Sum(nil)) } @@ -569,6 +903,36 @@ func conditionChanged(conditions *[]metav1.Condition, cond *metav1.Condition) bo return oldCond.Message != cond.Message } +func endpointSliceChanged(endpointSlice1, endpointSlice2 *discv1.EndpointSlice) bool { + if endpointSlice1.AddressType != endpointSlice2.AddressType { + return true + } + + if !reflect.DeepEqual(endpointSlice1.Labels, endpointSlice2.Labels) { + return true + } + + if len(endpointSlice1.Endpoints) != len(endpointSlice2.Endpoints) { + return true + } + + for i := range endpointSlice1.Endpoints { + addresses1 := endpointSlice1.Endpoints[i].Addresses + addresses2 := endpointSlice2.Endpoints[i].Addresses + if len(addresses1) != len(addresses2) { + return true + } + + for j := range addresses1 { + if addresses1[j] != addresses2[j] { + return true + } + } + } + + return false +} + // NewManager returns a new control manager. func NewManager(cl client.Client, peerTLS *tls.ParsedCertData, namespace string, crdMode bool) *Manager { logger := logrus.WithField("component", "controlplane.control.manager") diff --git a/pkg/controlplane/control/port.go b/pkg/controlplane/control/port.go index 3fe5fd50..a4864880 100644 --- a/pkg/controlplane/control/port.go +++ b/pkg/controlplane/control/port.go @@ -45,6 +45,11 @@ func (e conflictingTargetPortError) Error() string { return fmt.Sprintf("port %d is already in use by service %v", e.port, e.leaseName) } +func (e conflictingTargetPortError) Is(target error) bool { + _, ok := target.(*conflictingTargetPortError) + return ok +} + // portManager leases ports for use by imported services. type portManager struct { lock sync.Mutex diff --git a/pkg/controlplane/rest/import.go b/pkg/controlplane/rest/import.go index 5ced8ba2..4f83d8ad 100644 --- a/pkg/controlplane/rest/import.go +++ b/pkg/controlplane/rest/import.go @@ -18,7 +18,9 @@ import ( "encoding/json" "fmt" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1" @@ -34,6 +36,7 @@ func toK8SImport(imp *store.Import, namespace string) *v1alpha1.Import { ObjectMeta: metav1.ObjectMeta{ Name: imp.Name, Namespace: namespace, + Labels: imp.Labels, }, Spec: imp.ImportSpec, } @@ -46,7 +49,8 @@ func importToAPI(imp *store.Import) *v1alpha1.Import { return &v1alpha1.Import{ ObjectMeta: metav1.ObjectMeta{ - Name: imp.Name, + Name: imp.Name, + Labels: imp.Labels, }, Spec: imp.ImportSpec, } @@ -170,6 +174,29 @@ func (m *Manager) GetAllImports() []*store.Import { return m.imports.GetAll() } +func (m *Manager) GetMergeImportList() *v1alpha1.ImportList { + mergeImportList := v1alpha1.ImportList{} + for _, imp := range m.imports.GetAll() { + if imp.Labels[v1alpha1.LabelImportMerge] != "true" { + continue + } + + mergeImportList.Items = append(mergeImportList.Items, *toK8SImport(imp, m.namespace)) + } + + return &mergeImportList +} + +func (m *Manager) GetK8sImport(name string, imp *v1alpha1.Import) error { + storeImport := m.imports.Get(name) + if storeImport == nil { + return errors.NewNotFound(schema.GroupResource{}, name) + } + + *imp = *toK8SImport(storeImport, m.namespace) + return nil +} + // Decode an import. func (h *importHandler) Decode(data []byte) (any, error) { var imp v1alpha1.Import diff --git a/pkg/controlplane/store/types.go b/pkg/controlplane/store/types.go index b701c936..4ce08341 100644 --- a/pkg/controlplane/store/types.go +++ b/pkg/controlplane/store/types.go @@ -72,6 +72,8 @@ type Import struct { v1alpha1.ImportSpec // Name of import. Name string + // Labels defined for the import + Labels map[string]string // Version of the struct when object was created. Version uint32 } @@ -81,6 +83,7 @@ func NewImport(imp *v1alpha1.Import) *Import { return &Import{ ImportSpec: imp.Spec, Name: imp.Name, + Labels: imp.Labels, Version: importStructVersion, } } diff --git a/pkg/operator/controller/instance_controller.go b/pkg/operator/controller/instance_controller.go index f6b42a9d..93945bde 100644 --- a/pkg/operator/controller/instance_controller.go +++ b/pkg/operator/controller/instance_controller.go @@ -66,6 +66,7 @@ type InstanceReconciler struct { // +kubebuilder:rbac:groups=clusterlink.net,resources=instances/status,verbs=get;update;patch // +kubebuilder:rbac:groups=clusterlink.net,resources=instances/finalizers,verbs=update // +kubebuilder:rbac:groups="",resources=services;serviceaccounts,verbs=list;get;watch;create;update;patch;delete +// +kubebuilder:rbac:groups="discovery.k8s.io",resources=endpointslices,verbs=list;get;watch;create;update;patch;delete // +kubebuilder:rbac:groups="",resources=nodes,verbs=list;get;watch // +kubebuilder:rbac:groups="",resources=pods,verbs=list;get;watch // +kubebuilder:rbac:groups=clusterlink.net,resources=exports;peers;accesspolicies;privilegedaccesspolicies,verbs=list;get;watch @@ -414,6 +415,13 @@ func (r *InstanceReconciler) createAccessControl(ctx context.Context, name, name "get", "list", "watch", "create", "delete", "update", }, }, + { + APIGroups: []string{"discovery.k8s.io"}, + Resources: []string{"endpointslices"}, + Verbs: []string{ + "get", "list", "watch", "create", "delete", "update", + }, + }, { APIGroups: []string{""}, Resources: []string{"pods"}, diff --git a/tests/e2e/k8s/services/httpecho/client.go b/tests/e2e/k8s/services/httpecho/client.go index a23bca44..7669411e 100644 --- a/tests/e2e/k8s/services/httpecho/client.go +++ b/tests/e2e/k8s/services/httpecho/client.go @@ -74,3 +74,13 @@ func GetEchoValue(cluster *util.KindCluster, server *util.Service) (string, erro return strings.TrimSpace(string(body)), nil } + +func RunClientInPod(cluster *util.KindCluster, server *util.Service) (string, error) { + body, err := cluster.RunPod(&util.Pod{ + Name: "echo-client", + Namespace: server.Namespace, + Image: "curlimages/curl", + Args: []string{"curl", "-s", "-m", "1", "http://" + server.Name}, + }) + return strings.TrimSpace(body), err +} diff --git a/tests/e2e/k8s/test_import.go b/tests/e2e/k8s/test_import.go index faf13371..8d8205d7 100644 --- a/tests/e2e/k8s/test_import.go +++ b/tests/e2e/k8s/test_import.go @@ -15,6 +15,7 @@ package k8s import ( "context" + "strconv" "strings" "github.com/stretchr/testify/require" @@ -24,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/types" "github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1" + "github.com/clusterlink-net/clusterlink/pkg/bootstrap/platform" "github.com/clusterlink-net/clusterlink/pkg/controlplane/control" "github.com/clusterlink-net/clusterlink/tests/e2e/k8s/services" "github.com/clusterlink-net/clusterlink/tests/e2e/k8s/services/httpecho" @@ -49,7 +51,7 @@ func (s *TestSuite) TestImportConflictingTargetPort() { require.Nil(s.T(), cl[0].Cluster().Resources().Create(context.Background(), imp1)) // verify import service is created - require.Nil(s.T(), cl[0].WaitForImportCondition(imp1, v1alpha1.ImportServiceCreated, true)) + require.Nil(s.T(), cl[0].WaitForImportCondition(imp1, v1alpha1.ImportServiceValid, true)) // create a second import with the same explicit target port imp2 := &v1alpha1.Import{ @@ -66,7 +68,7 @@ func (s *TestSuite) TestImportConflictingTargetPort() { require.Nil(s.T(), cl[0].Cluster().Resources().Create(context.Background(), imp2)) // verify import status indicates a conflict - require.Nil(s.T(), cl[0].WaitForImportCondition(imp2, v1alpha1.ImportServiceCreated, false)) + require.Nil(s.T(), cl[0].WaitForImportCondition(imp2, v1alpha1.ImportServiceValid, false)) require.True(s.T(), meta.IsStatusConditionFalse(imp2.Status.Conditions, v1alpha1.ImportTargetPortValid)) // verify that service for the second import was not created @@ -82,7 +84,7 @@ func (s *TestSuite) TestImportConflictingTargetPort() { require.Nil(s.T(), cl[0].Cluster().Resources().Update(context.Background(), imp2)) // verify the status is now good - require.Nil(s.T(), cl[0].WaitForImportCondition(imp2, v1alpha1.ImportServiceCreated, true)) + require.Nil(s.T(), cl[0].WaitForImportCondition(imp2, v1alpha1.ImportServiceValid, true)) require.True(s.T(), meta.IsStatusConditionTrue(imp2.Status.Conditions, v1alpha1.ImportTargetPortValid)) // second import service should now exist (but return RST as it has no sources) @@ -122,7 +124,7 @@ func (s *TestSuite) TestImportConflictingService() { require.Nil(s.T(), cl[0].Cluster().Resources().Create(context.Background(), imp)) // verify import status indicates service could not be created (due to conflict) - require.Nil(s.T(), cl[0].WaitForImportCondition(imp, v1alpha1.ImportServiceCreated, false)) + require.Nil(s.T(), cl[0].WaitForImportCondition(imp, v1alpha1.ImportServiceValid, false)) // update the service to look as service managed by clusterlink service.Labels = make(map[string]string) @@ -132,7 +134,7 @@ func (s *TestSuite) TestImportConflictingService() { require.Nil(s.T(), cl[0].Cluster().Resources().Update(context.Background(), service)) // check import status reflects that service was updated successfully - require.Nil(s.T(), cl[0].WaitForImportCondition(imp, v1alpha1.ImportServiceCreated, true)) + require.Nil(s.T(), cl[0].WaitForImportCondition(imp, v1alpha1.ImportServiceValid, true)) // verify service exist (RST instead of service not found) impService := &util.Service{ @@ -149,7 +151,7 @@ func (s *TestSuite) TestImportConflictingService() { _, err = cl[0].AccessService(httpecho.GetEchoValue, impService, true, &services.ConnectionResetError{}) require.ErrorIs(s.T(), err, &services.ConnectionResetError{}) // verify status is good - require.Nil(s.T(), cl[0].WaitForImportCondition(imp, v1alpha1.ImportServiceCreated, true)) + require.Nil(s.T(), cl[0].WaitForImportCondition(imp, v1alpha1.ImportServiceValid, true)) // update service managed-by label to non-clusterlink require.Nil(s.T(), cl[0].Cluster().Resources().Get(context.Background(), service.Name, service.Namespace, service)) @@ -157,14 +159,14 @@ func (s *TestSuite) TestImportConflictingService() { require.Nil(s.T(), cl[0].Cluster().Resources().Update(context.Background(), service)) // verify import status indicates invalid - require.Nil(s.T(), cl[0].WaitForImportCondition(imp, v1alpha1.ImportServiceCreated, false)) + require.Nil(s.T(), cl[0].WaitForImportCondition(imp, v1alpha1.ImportServiceValid, false)) // update managed-by label back to clusterlink service.Labels[control.LabelManagedBy] = control.AppName require.Nil(s.T(), cl[0].Cluster().Resources().Update(context.Background(), service)) // verify access and status are back ok - require.Nil(s.T(), cl[0].WaitForImportCondition(imp, v1alpha1.ImportServiceCreated, true)) + require.Nil(s.T(), cl[0].WaitForImportCondition(imp, v1alpha1.ImportServiceValid, true)) _, err = cl[0].AccessService(httpecho.GetEchoValue, impService, true, &services.ConnectionResetError{}) require.ErrorIs(s.T(), err, &services.ConnectionResetError{}) } @@ -224,12 +226,12 @@ func (s *TestSuite) TestImportUnprivilegedNamespace() { require.Nil(s.T(), cl[0].Cluster().Resources().Create(context.Background(), imp)) // verify status indicates invalid - require.Nil(s.T(), cl[0].WaitForImportCondition(imp, v1alpha1.ImportServiceCreated, false)) + require.Nil(s.T(), cl[0].WaitForImportCondition(imp, v1alpha1.ImportServiceValid, false)) // delete the conflicting system service require.Nil(s.T(), cl[0].Cluster().Resources().Delete(context.Background(), systemService)) // wait for status to be good - require.Nil(s.T(), cl[0].WaitForImportCondition(imp, v1alpha1.ImportServiceCreated, true)) + require.Nil(s.T(), cl[0].WaitForImportCondition(imp, v1alpha1.ImportServiceValid, true)) // update the import service to a bad service managed by other require.Nil(s.T(), cl[0].Cluster().Resources().Get(context.Background(), service.Name, service.Namespace, service)) @@ -238,14 +240,14 @@ func (s *TestSuite) TestImportUnprivilegedNamespace() { require.Nil(s.T(), cl[0].Cluster().Resources().Update(context.Background(), service)) // wait for status to indicate invalid - require.Nil(s.T(), cl[0].WaitForImportCondition(imp, v1alpha1.ImportServiceCreated, false)) + require.Nil(s.T(), cl[0].WaitForImportCondition(imp, v1alpha1.ImportServiceValid, false)) // return service to be managed by clusterlink service.Labels[control.LabelManagedBy] = control.AppName require.Nil(s.T(), cl[0].Cluster().Resources().Update(context.Background(), service)) // wait for status to be good - require.Nil(s.T(), cl[0].WaitForImportCondition(imp, v1alpha1.ImportServiceCreated, true)) + require.Nil(s.T(), cl[0].WaitForImportCondition(imp, v1alpha1.ImportServiceValid, true)) // verify imported service exists (getting a RST since no sources are defined) impService := &util.Service{ @@ -282,7 +284,7 @@ func (s *TestSuite) TestImportDelete() { require.Nil(s.T(), cl[0].Cluster().Resources().Create(context.Background(), imp)) // wait for status to indicate service was created - require.Nil(s.T(), cl[0].WaitForImportCondition(imp, v1alpha1.ImportServiceCreated, true)) + require.Nil(s.T(), cl[0].WaitForImportCondition(imp, v1alpha1.ImportServiceValid, true)) // delete import require.Nil(s.T(), cl[0].Cluster().Resources().Delete(context.Background(), imp)) @@ -308,7 +310,7 @@ func (s *TestSuite) TestImportDelete() { } require.Nil(s.T(), cl[0].Cluster().Resources().Create(context.Background(), imp2)) // verify status is good - require.Nil(s.T(), cl[0].WaitForImportCondition(imp2, v1alpha1.ImportServiceCreated, true)) + require.Nil(s.T(), cl[0].WaitForImportCondition(imp2, v1alpha1.ImportServiceValid, true)) } // this test requires K8s 1.29+, as it relies on x-kubernetes-validations. @@ -334,3 +336,130 @@ func (s *TestSuite) TestImportInvalidName() { imp.Name = strings.Repeat("a", 63) require.Nil(s.T(), cl[0].Cluster().Resources().Create(context.Background(), imp)) } + +func (s *TestSuite) TestImportMerge() { + testFunc := func(crdMode bool) { + cfg := &util.PeerConfig{ + CRUDMode: !crdMode, + DataplaneType: platform.DataplaneTypeEnvoy, + Dataplanes: 1, + } + + cl, err := s.fabric.DeployClusterlinks(1, cfg) + require.Nil(s.T(), err) + + // create export, peer, and allow-all policy + require.Nil(s.T(), cl[0].CreateService(&httpEchoService)) + require.Nil(s.T(), cl[0].CreateExport(&httpEchoService)) + require.Nil(s.T(), cl[0].CreatePolicy(util.PolicyAllowAll)) + require.Nil(s.T(), cl[0].CreatePeer(cl[0])) + + importedService := &util.Service{ + Name: "imported", + Port: 80, + } + + // create merge import + imp := &v1alpha1.Import{ + ObjectMeta: metav1.ObjectMeta{ + Name: importedService.Name, + Namespace: cl[0].Namespace(), + Labels: map[string]string{ + v1alpha1.LabelImportMerge: "true", + }, + }, + Spec: v1alpha1.ImportSpec{ + Port: importedService.Port, + Sources: []v1alpha1.ImportSource{{ + Peer: cl[0].Name(), + ExportName: httpEchoService.Name, + ExportNamespace: cl[0].Namespace(), + }}, + }, + } + if crdMode { + require.Nil(s.T(), cl[0].Cluster().Resources().Create(context.Background(), imp)) + + // verify status is bad, since imported service should be pre-created for a merge import + require.Nil(s.T(), cl[0].WaitForImportCondition(imp, v1alpha1.ImportServiceValid, false)) + } else { + // CRUD mode will fail since service does not exist, and operation is not async + require.NotNil(s.T(), cl[0].Client().Imports.Create(imp)) + } + + // create the import service + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: importedService.Name, + Namespace: cl[0].Namespace(), + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{{ + Port: int32(importedService.Port), + }}, + }, + } + require.Nil(s.T(), cl[0].Cluster().Resources().Create(context.Background(), service)) + + if crdMode { + // verify status becomes good + require.Nil(s.T(), cl[0].WaitForImportCondition(imp, v1alpha1.ImportServiceValid, true)) + } else { + // update import to re-try endpoint slice creation + require.Nil(s.T(), cl[0].Client().Imports.Update(imp)) + } + + // verify service access + data, err := cl[0].AccessService(httpecho.RunClientInPod, importedService, true, nil) + require.Nil(s.T(), err) + require.Equal(s.T(), cl[0].Name(), data) + + // update dataplane endpoint slice via scaling + require.Nil(s.T(), cl[0].ScaleDataplane(0)) + require.Nil(s.T(), cl[0].ScaleDataplane(1)) + + // verify service access + _, err = cl[0].AccessService(httpecho.RunClientInPod, importedService, true, nil) + require.Nil(s.T(), err) + + // delete dataplane endpoint slice by deleting the dataplane service + var dataplaneService v1.Service + require.Nil(s.T(), cl[0].Cluster().Resources().Get( + context.Background(), "cl-dataplane", cl[0].Namespace(), &dataplaneService)) + require.Nil(s.T(), cl[0].Cluster().Resources().Delete( + context.Background(), &dataplaneService)) + + // verify no access + _, err = cl[0].AccessService(httpecho.RunClientInPod, importedService, true, &util.PodFailedError{}) + require.ErrorIs(s.T(), err, &util.PodFailedError{}) + + // create dataplane endpoint slice + dataplaneService.ResourceVersion = "" + require.Nil(s.T(), cl[0].Cluster().Resources().Create( + context.Background(), &dataplaneService)) + + // verify access is back + _, err = cl[0].AccessService(httpecho.RunClientInPod, importedService, true, nil) + require.Nil(s.T(), err) + + // remove merge property of import + delete(imp.Labels, v1alpha1.LabelImportMerge) + if crdMode { + require.Nil(s.T(), cl[0].Cluster().Resources().Update(context.Background(), imp)) + } else { + // CRUD mode will fail since service does not exist, and operation is not async + require.NotNil(s.T(), cl[0].Client().Imports.Update(imp)) + } + + // verify no access + _, err = cl[0].AccessService( + httpecho.RunClientInPod, importedService, true, &util.PodFailedError{}) + require.ErrorIs(s.T(), err, &util.PodFailedError{}) + } + + // run test on both CRDMode = {true, false} + for _, crdMode := range []bool{true, false} { + testName := "CRDMode" + strings.ToUpper(strconv.FormatBool(crdMode)) + s.RunSubTest(testName, func() { testFunc(crdMode) }) + } +} diff --git a/tests/e2e/k8s/util/clusterlink.go b/tests/e2e/k8s/util/clusterlink.go index 5b5c144f..cf5a9d06 100644 --- a/tests/e2e/k8s/util/clusterlink.go +++ b/tests/e2e/k8s/util/clusterlink.go @@ -147,6 +147,8 @@ func (c *ClusterLink) AccessService( continue case errors.Is(err, &services.ConnectionResetError{}): continue + case errors.Is(err, &PodFailedError{}): + continue case err == nil && expectedError != nil: continue } @@ -362,13 +364,13 @@ func (c *ClusterLink) DeleteImport(name string) error { } func (c *ClusterLink) CreatePolicy(policy *v1alpha1.AccessPolicy) error { - if c.crdMode { - if policy.Namespace == "" { - accessPolicyCopy := *policy - accessPolicyCopy.Namespace = c.namespace - policy = &accessPolicyCopy - } + if policy.Namespace == "" { + accessPolicyCopy := *policy + accessPolicyCopy.Namespace = c.namespace + policy = &accessPolicyCopy + } + if c.crdMode { return c.cluster.Resources().Create(context.Background(), policy) } diff --git a/tests/e2e/k8s/util/fabric.go b/tests/e2e/k8s/util/fabric.go index 5743c34b..076544b6 100644 --- a/tests/e2e/k8s/util/fabric.go +++ b/tests/e2e/k8s/util/fabric.go @@ -147,6 +147,19 @@ func (f *Fabric) SwitchToNewNamespace(name string, appendName bool) error { if f.namespace != "" { // delete old namespace for _, p := range f.peers { + // delete imports to avoid slowing down upcoming tests + var imports v1alpha1.ImportList + if err := p.cluster.Resources().List(context.Background(), &imports); err != nil { + return err + } + + for i := range imports.Items { + err := p.cluster.Resources().Delete(context.Background(), &(imports.Items[i])) + if err != nil { + return err + } + } + if err := p.cluster.DeleteNamespace(f.namespace); err != nil { return fmt.Errorf("cannot delete namespace %s: %w", f.namespace, err) } diff --git a/tests/e2e/k8s/util/kind.go b/tests/e2e/k8s/util/kind.go index fb3fbfeb..b5ee0ab0 100644 --- a/tests/e2e/k8s/util/kind.go +++ b/tests/e2e/k8s/util/kind.go @@ -46,6 +46,13 @@ const ( ExportedLogsPath = "/tmp/clusterlink-k8s-tests" ) +// PodFailedError represents a pod that ran and returned a failure. +type PodFailedError struct{} + +func (e PodFailedError) Error() string { + return "pod failed" +} + // Service represents a kubernetes service. type Service struct { // Name is the service name. @@ -333,6 +340,9 @@ func (c *KindCluster) RunPod(podSpec *Pod) (string, error) { return "", fmt.Errorf("cannot get pod status: %w", err) } if pod.Status.Phase != v1.PodSucceeded { + if pod.Status.Phase == v1.PodFailed { + return "", &PodFailedError{} + } return "", fmt.Errorf("pod did not succeed: %s", pod.Status.Phase) }