Skip to content

Commit

Permalink
v1alpha1.Import: Support merging with an existing service
Browse files Browse the repository at this point in the history
This commit adds a new option for Imports to be merged with an existing
K8s service. This works by adding an endpoint slices to the exsting service,
which points to the clusterlink dataplane service.

Signed-off-by: Or Ozeri <oro@il.ibm.com>
  • Loading branch information
orozery committed Apr 16, 2024
1 parent 8047fed commit b8eccd4
Show file tree
Hide file tree
Showing 16 changed files with 777 additions and 157 deletions.
41 changes: 24 additions & 17 deletions cmd/cl-controlplane/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/pflag"
v1 "k8s.io/api/core/v1"
discv1 "k8s.io/api/discovery/v1"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
Expand Down Expand Up @@ -150,25 +151,29 @@ func (o *Options) Run() error {
return fmt.Errorf("unable to add core v1 objects to scheme: %w", err)
}

if err := discv1.AddToScheme(scheme); err != nil {
return fmt.Errorf("unable to add discovery v1 objects to scheme: %w", err)
}

// set logger for controller-runtime components
ctrl.SetLogger(logrusr.New(logrus.WithField("component", "k8s.controller-runtime")))

// limit watch for v1alpha1.Peer to the namespace given by 'namespace'
managerOptions := manager.Options{}
managerOptions := manager.Options{
Cache: cache.Options{
ByObject: make(map[client.Object]cache.ByObject),
},
Scheme: scheme,
}

// limit watch for v1alpha1.Peer and EndpointSlice to the namespace given by 'namespace'
if o.CRDMode {
managerOptions = manager.Options{
Cache: cache.Options{
ByObject: map[client.Object]cache.ByObject{
&v1alpha1.Peer{}: {
Namespaces: map[string]cache.Config{
namespace: {},
},
},
},
managerOptions.Cache.ByObject[&v1alpha1.Peer{}] = cache.ByObject{
Namespaces: map[string]cache.Config{
namespace: {},
},
Scheme: scheme,
}
}

mgr, err := manager.New(config, managerOptions)
if err != nil {
return fmt.Errorf(
Expand Down Expand Up @@ -198,6 +203,11 @@ func (o *Options) Run() error {

controlManager := control.NewManager(mgr.GetClient(), parsedCertData, namespace, o.CRDMode)

err = control.CreateControllers(controlManager, mgr, o.CRDMode)
if err != nil {
return fmt.Errorf("cannot create control controllers: %w", err)
}

xdsManager := xds.NewManager(o.CRDMode)
xds.RegisterService(
context.Background(), xdsManager, grpcServer.GetGRPCServer())
Expand All @@ -207,11 +217,6 @@ func (o *Options) Run() error {
if err != nil {
return fmt.Errorf("cannot create xDS controllers: %w", err)
}

err = control.CreateControllers(controlManager, mgr)
if err != nil {
return fmt.Errorf("cannot create control controllers: %w", err)
}
} else {
// open store
kvStore, err := bolt.Open(StoreFile)
Expand All @@ -235,6 +240,8 @@ func (o *Options) Run() error {

cprest.RegisterHandlers(restManager, httpServer)

controlManager.SetGetMergeImportListCallback(restManager.GetMergeImportList)
controlManager.SetGetImportCallback(restManager.GetK8sImport)
controlManager.SetStatusCallback(func(pr *v1alpha1.Peer) {
authzManager.AddPeer(pr)
})
Expand Down
10 changes: 9 additions & 1 deletion cmd/gwctl/subcommand/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type importOptions struct {
name string
port uint16
peers []string
merge bool
}

// ImportCreateCmd - create an imported service.
Expand Down Expand Up @@ -76,6 +77,7 @@ func (o *importOptions) addFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.name, "name", "", "Imported service name")
fs.Uint16Var(&o.port, "port", 0, "Imported service port")
fs.StringSliceVar(&o.peers, "peer", []string{}, "Remote peer to import the service from")
fs.BoolVar(&o.merge, "merge", false, "Merge with an existing service endpoint")
}

// run performs the execution of the 'create import' or 'update import' subcommand.
Expand All @@ -96,9 +98,15 @@ func (o *importOptions) run(isUpdate bool) error {
sources[i].ExportName = o.name
}

labels := make(map[string]string)
if o.merge {
labels[v1alpha1.LabelImportMerge] = "true"
}

err = importOperation(&v1alpha1.Import{
ObjectMeta: metav1.ObjectMeta{
Name: o.name,
Name: o.name,
Labels: labels,
},
Spec: v1alpha1.ImportSpec{
Port: o.port,
Expand Down
12 changes: 12 additions & 0 deletions config/operator/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,18 @@ rules:
- get
- patch
- update
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- rbac.authorization.k8s.io
resources:
Expand Down
6 changes: 4 additions & 2 deletions pkg/apis/clusterlink.net/v1alpha1/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ type ImportSpec struct {
const (
// ImportTargetPortValid is a condition type for indicating whether the import target port is valid.
ImportTargetPortValid string = "ImportTargetPortValid"
// ImportServiceCreated is a condition type for indicating whether the import service was successfully created.
ImportServiceCreated string = "ImportServiceCreated"
// ImportServiceValid is a condition type for indicating whether the import service exists and valid.
ImportServiceValid string = "ImportServiceValid"

LabelImportMerge string = "import.clusterlink.net/merge"
)

// ImportStatus represents the status of an imported service.
Expand Down
3 changes: 3 additions & 0 deletions pkg/bootstrap/platform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get", "list", "watch", "create", "delete", "update"]
- apiGroups: ["discovery.k8s.io"]
resources: ["endpointslices"]
verbs: ["get", "list", "watch", "create", "delete", "update"]
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch"]
Expand Down
107 changes: 62 additions & 45 deletions pkg/controlplane/control/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,64 +18,81 @@ import (

"github.com/clusterlink-net/clusterlink/pkg/util/controller"
v1 "k8s.io/api/core/v1"
discv1 "k8s.io/api/discovery/v1"

"k8s.io/apimachinery/pkg/types"

"github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1"
ctrl "sigs.k8s.io/controller-runtime"
)

// CreateControllers creates the various k8s controllers used to update the control manager.
func CreateControllers(mgr *Manager, controllerManager ctrl.Manager) error {
err := controller.AddToManager(controllerManager, &controller.Spec{
Name: "control.peer",
Object: &v1alpha1.Peer{},
AddHandler: func(ctx context.Context, object any) error {
mgr.AddPeer(object.(*v1alpha1.Peer))
return nil
},
DeleteHandler: func(ctx context.Context, name types.NamespacedName) error {
mgr.DeletePeer(name.Name)
return nil
},
})
if err != nil {
return err
}
func CreateControllers(mgr *Manager, controllerManager ctrl.Manager, crdMode bool) error {
if crdMode {
err := controller.AddToManager(controllerManager, &controller.Spec{
Name: "control.peer",
Object: &v1alpha1.Peer{},
AddHandler: func(ctx context.Context, object any) error {
mgr.AddPeer(object.(*v1alpha1.Peer))
return nil
},
DeleteHandler: func(ctx context.Context, name types.NamespacedName) error {
mgr.DeletePeer(name.Name)
return nil
},
})
if err != nil {
return err
}
err = controller.AddToManager(controllerManager, &controller.Spec{
Name: "control.service",
Object: &v1.Service{},
AddHandler: func(ctx context.Context, object any) error {
return mgr.addService(ctx, object.(*v1.Service))
},
DeleteHandler: func(ctx context.Context, name types.NamespacedName) error {
return mgr.deleteService(ctx, name)
},
})
if err != nil {
return err
}

err = controller.AddToManager(controllerManager, &controller.Spec{
Name: "control.service",
Object: &v1.Service{},
AddHandler: func(ctx context.Context, object any) error {
return mgr.addService(ctx, object.(*v1.Service))
},
DeleteHandler: func(ctx context.Context, name types.NamespacedName) error {
return mgr.deleteService(ctx, name)
},
})
if err != nil {
return err
}
err = controller.AddToManager(controllerManager, &controller.Spec{
Name: "control.export",
Object: &v1alpha1.Export{},
AddHandler: func(ctx context.Context, object any) error {
return mgr.addExport(ctx, object.(*v1alpha1.Export))
},
DeleteHandler: func(ctx context.Context, name types.NamespacedName) error {
return nil
},
})
if err != nil {
return err
}

err = controller.AddToManager(controllerManager, &controller.Spec{
Name: "control.export",
Object: &v1alpha1.Export{},
AddHandler: func(ctx context.Context, object any) error {
return mgr.addExport(ctx, object.(*v1alpha1.Export))
},
DeleteHandler: func(ctx context.Context, name types.NamespacedName) error {
return nil
},
})
if err != nil {
return err
err = controller.AddToManager(controllerManager, &controller.Spec{
Name: "control.import",
Object: &v1alpha1.Import{},
AddHandler: func(ctx context.Context, object any) error {
return mgr.AddImport(ctx, object.(*v1alpha1.Import))
},
DeleteHandler: mgr.DeleteImport,
})
if err != nil {
return err
}
}

return controller.AddToManager(controllerManager, &controller.Spec{
Name: "control.import",
Object: &v1alpha1.Import{},
Name: "control.endpointslice",
Object: &discv1.EndpointSlice{},
AddHandler: func(ctx context.Context, object any) error {
return mgr.AddImport(ctx, object.(*v1alpha1.Import))
return mgr.addEndpointSlice(ctx, object.(*discv1.EndpointSlice))
},
DeleteHandler: func(ctx context.Context, name types.NamespacedName) error {
return mgr.deleteEndpointSlice(ctx, name)
},
DeleteHandler: mgr.DeleteImport,
})
}
Loading

0 comments on commit b8eccd4

Please sign in to comment.