Skip to content

Commit

Permalink
Merge pull request #446 from praveingk/endpointslice
Browse files Browse the repository at this point in the history
Support creating EndpointSlice while importing service
  • Loading branch information
orozery authored Apr 16, 2024
2 parents 8047fed + b8eccd4 commit d591fac
Show file tree
Hide file tree
Showing 16 changed files with 777 additions and 157 deletions.
41 changes: 24 additions & 17 deletions cmd/cl-controlplane/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/pflag"
v1 "k8s.io/api/core/v1"
discv1 "k8s.io/api/discovery/v1"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
Expand Down Expand Up @@ -150,25 +151,29 @@ func (o *Options) Run() error {
return fmt.Errorf("unable to add core v1 objects to scheme: %w", err)
}

if err := discv1.AddToScheme(scheme); err != nil {
return fmt.Errorf("unable to add discovery v1 objects to scheme: %w", err)
}

// set logger for controller-runtime components
ctrl.SetLogger(logrusr.New(logrus.WithField("component", "k8s.controller-runtime")))

// limit watch for v1alpha1.Peer to the namespace given by 'namespace'
managerOptions := manager.Options{}
managerOptions := manager.Options{
Cache: cache.Options{
ByObject: make(map[client.Object]cache.ByObject),
},
Scheme: scheme,
}

// limit watch for v1alpha1.Peer and EndpointSlice to the namespace given by 'namespace'
if o.CRDMode {
managerOptions = manager.Options{
Cache: cache.Options{
ByObject: map[client.Object]cache.ByObject{
&v1alpha1.Peer{}: {
Namespaces: map[string]cache.Config{
namespace: {},
},
},
},
managerOptions.Cache.ByObject[&v1alpha1.Peer{}] = cache.ByObject{
Namespaces: map[string]cache.Config{
namespace: {},
},
Scheme: scheme,
}
}

mgr, err := manager.New(config, managerOptions)
if err != nil {
return fmt.Errorf(
Expand Down Expand Up @@ -198,6 +203,11 @@ func (o *Options) Run() error {

controlManager := control.NewManager(mgr.GetClient(), parsedCertData, namespace, o.CRDMode)

err = control.CreateControllers(controlManager, mgr, o.CRDMode)
if err != nil {
return fmt.Errorf("cannot create control controllers: %w", err)
}

xdsManager := xds.NewManager(o.CRDMode)
xds.RegisterService(
context.Background(), xdsManager, grpcServer.GetGRPCServer())
Expand All @@ -207,11 +217,6 @@ func (o *Options) Run() error {
if err != nil {
return fmt.Errorf("cannot create xDS controllers: %w", err)
}

err = control.CreateControllers(controlManager, mgr)
if err != nil {
return fmt.Errorf("cannot create control controllers: %w", err)
}
} else {
// open store
kvStore, err := bolt.Open(StoreFile)
Expand All @@ -235,6 +240,8 @@ func (o *Options) Run() error {

cprest.RegisterHandlers(restManager, httpServer)

controlManager.SetGetMergeImportListCallback(restManager.GetMergeImportList)
controlManager.SetGetImportCallback(restManager.GetK8sImport)
controlManager.SetStatusCallback(func(pr *v1alpha1.Peer) {
authzManager.AddPeer(pr)
})
Expand Down
10 changes: 9 additions & 1 deletion cmd/gwctl/subcommand/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type importOptions struct {
name string
port uint16
peers []string
merge bool
}

// ImportCreateCmd - create an imported service.
Expand Down Expand Up @@ -76,6 +77,7 @@ func (o *importOptions) addFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.name, "name", "", "Imported service name")
fs.Uint16Var(&o.port, "port", 0, "Imported service port")
fs.StringSliceVar(&o.peers, "peer", []string{}, "Remote peer to import the service from")
fs.BoolVar(&o.merge, "merge", false, "Merge with an existing service endpoint")
}

// run performs the execution of the 'create import' or 'update import' subcommand.
Expand All @@ -96,9 +98,15 @@ func (o *importOptions) run(isUpdate bool) error {
sources[i].ExportName = o.name
}

labels := make(map[string]string)
if o.merge {
labels[v1alpha1.LabelImportMerge] = "true"
}

err = importOperation(&v1alpha1.Import{
ObjectMeta: metav1.ObjectMeta{
Name: o.name,
Name: o.name,
Labels: labels,
},
Spec: v1alpha1.ImportSpec{
Port: o.port,
Expand Down
12 changes: 12 additions & 0 deletions config/operator/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,18 @@ rules:
- get
- patch
- update
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- rbac.authorization.k8s.io
resources:
Expand Down
6 changes: 4 additions & 2 deletions pkg/apis/clusterlink.net/v1alpha1/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ type ImportSpec struct {
const (
// ImportTargetPortValid is a condition type for indicating whether the import target port is valid.
ImportTargetPortValid string = "ImportTargetPortValid"
// ImportServiceCreated is a condition type for indicating whether the import service was successfully created.
ImportServiceCreated string = "ImportServiceCreated"
// ImportServiceValid is a condition type for indicating whether the import service exists and valid.
ImportServiceValid string = "ImportServiceValid"

LabelImportMerge string = "import.clusterlink.net/merge"
)

// ImportStatus represents the status of an imported service.
Expand Down
3 changes: 3 additions & 0 deletions pkg/bootstrap/platform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get", "list", "watch", "create", "delete", "update"]
- apiGroups: ["discovery.k8s.io"]
resources: ["endpointslices"]
verbs: ["get", "list", "watch", "create", "delete", "update"]
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch"]
Expand Down
107 changes: 62 additions & 45 deletions pkg/controlplane/control/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,64 +18,81 @@ import (

"github.com/clusterlink-net/clusterlink/pkg/util/controller"
v1 "k8s.io/api/core/v1"
discv1 "k8s.io/api/discovery/v1"

"k8s.io/apimachinery/pkg/types"

"github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1"
ctrl "sigs.k8s.io/controller-runtime"
)

// CreateControllers creates the various k8s controllers used to update the control manager.
func CreateControllers(mgr *Manager, controllerManager ctrl.Manager) error {
err := controller.AddToManager(controllerManager, &controller.Spec{
Name: "control.peer",
Object: &v1alpha1.Peer{},
AddHandler: func(ctx context.Context, object any) error {
mgr.AddPeer(object.(*v1alpha1.Peer))
return nil
},
DeleteHandler: func(ctx context.Context, name types.NamespacedName) error {
mgr.DeletePeer(name.Name)
return nil
},
})
if err != nil {
return err
}
func CreateControllers(mgr *Manager, controllerManager ctrl.Manager, crdMode bool) error {
if crdMode {
err := controller.AddToManager(controllerManager, &controller.Spec{
Name: "control.peer",
Object: &v1alpha1.Peer{},
AddHandler: func(ctx context.Context, object any) error {
mgr.AddPeer(object.(*v1alpha1.Peer))
return nil
},
DeleteHandler: func(ctx context.Context, name types.NamespacedName) error {
mgr.DeletePeer(name.Name)
return nil
},
})
if err != nil {
return err
}
err = controller.AddToManager(controllerManager, &controller.Spec{
Name: "control.service",
Object: &v1.Service{},
AddHandler: func(ctx context.Context, object any) error {
return mgr.addService(ctx, object.(*v1.Service))
},
DeleteHandler: func(ctx context.Context, name types.NamespacedName) error {
return mgr.deleteService(ctx, name)
},
})
if err != nil {
return err
}

err = controller.AddToManager(controllerManager, &controller.Spec{
Name: "control.service",
Object: &v1.Service{},
AddHandler: func(ctx context.Context, object any) error {
return mgr.addService(ctx, object.(*v1.Service))
},
DeleteHandler: func(ctx context.Context, name types.NamespacedName) error {
return mgr.deleteService(ctx, name)
},
})
if err != nil {
return err
}
err = controller.AddToManager(controllerManager, &controller.Spec{
Name: "control.export",
Object: &v1alpha1.Export{},
AddHandler: func(ctx context.Context, object any) error {
return mgr.addExport(ctx, object.(*v1alpha1.Export))
},
DeleteHandler: func(ctx context.Context, name types.NamespacedName) error {
return nil
},
})
if err != nil {
return err
}

err = controller.AddToManager(controllerManager, &controller.Spec{
Name: "control.export",
Object: &v1alpha1.Export{},
AddHandler: func(ctx context.Context, object any) error {
return mgr.addExport(ctx, object.(*v1alpha1.Export))
},
DeleteHandler: func(ctx context.Context, name types.NamespacedName) error {
return nil
},
})
if err != nil {
return err
err = controller.AddToManager(controllerManager, &controller.Spec{
Name: "control.import",
Object: &v1alpha1.Import{},
AddHandler: func(ctx context.Context, object any) error {
return mgr.AddImport(ctx, object.(*v1alpha1.Import))
},
DeleteHandler: mgr.DeleteImport,
})
if err != nil {
return err
}
}

return controller.AddToManager(controllerManager, &controller.Spec{
Name: "control.import",
Object: &v1alpha1.Import{},
Name: "control.endpointslice",
Object: &discv1.EndpointSlice{},
AddHandler: func(ctx context.Context, object any) error {
return mgr.AddImport(ctx, object.(*v1alpha1.Import))
return mgr.addEndpointSlice(ctx, object.(*discv1.EndpointSlice))
},
DeleteHandler: func(ctx context.Context, name types.NamespacedName) error {
return mgr.deleteEndpointSlice(ctx, name)
},
DeleteHandler: mgr.DeleteImport,
})
}
Loading

0 comments on commit d591fac

Please sign in to comment.