Skip to content

Commit

Permalink
Using ClusterCacheTracker instead of remote.NewClusterClient
Browse files Browse the repository at this point in the history
  • Loading branch information
laozc committed Aug 3, 2023
1 parent 7de8e8b commit 124988a
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 27 deletions.
31 changes: 28 additions & 3 deletions controllers/controllers_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes/scheme"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/remote"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"

infrav1 "sigs.k8s.io/cluster-api-provider-vsphere/apis/v1beta1"
Expand Down Expand Up @@ -63,6 +65,29 @@ func setup() {

testEnv = helpers.NewTestEnvironment()

secretCachingClient, err := client.New(testEnv.Manager.GetConfig(), client.Options{
HTTPClient: testEnv.Manager.GetHTTPClient(),
Cache: &client.CacheOptions{
Reader: testEnv.Manager.GetCache(),
},
})
if err != nil {
panic("unable to create secret caching client")
}

tracker, err := remote.NewClusterCacheTracker(
testEnv.Manager,
remote.ClusterCacheTrackerOptions{
SecretCachingClient: secretCachingClient,
ControllerName: "",
Log: nil,
Indexes: []remote.Index{remote.NodeProviderIDIndex},
},
)
if err != nil {
panic(fmt.Sprintf("unable to setup ClusterCacheTracker: %v", err))
}

controllerOpts := controller.Options{MaxConcurrentReconciles: 10}

if err := AddClusterControllerToManager(testEnv.GetContext(), testEnv.Manager, &infrav1.VSphereCluster{}, controllerOpts); err != nil {
Expand All @@ -71,7 +96,7 @@ func setup() {
if err := AddMachineControllerToManager(testEnv.GetContext(), testEnv.Manager, &infrav1.VSphereMachine{}, controllerOpts); err != nil {
panic(fmt.Sprintf("unable to setup VsphereMachine controller: %v", err))
}
if err := AddVMControllerToManager(testEnv.GetContext(), testEnv.Manager, controllerOpts); err != nil {
if err := AddVMControllerToManager(testEnv.GetContext(), testEnv.Manager, tracker, controllerOpts); err != nil {
panic(fmt.Sprintf("unable to setup VsphereVM controller: %v", err))
}
if err := AddVsphereClusterIdentityControllerToManager(testEnv.GetContext(), testEnv.Manager, controllerOpts); err != nil {
Expand All @@ -80,10 +105,10 @@ func setup() {
if err := AddVSphereDeploymentZoneControllerToManager(testEnv.GetContext(), testEnv.Manager, controllerOpts); err != nil {
panic(fmt.Sprintf("unable to setup VSphereDeploymentZone controller: %v", err))
}
if err := AddServiceAccountProviderControllerToManager(testEnv.GetContext(), testEnv.Manager, controllerOpts); err != nil {
if err := AddServiceAccountProviderControllerToManager(testEnv.GetContext(), testEnv.Manager, tracker, controllerOpts); err != nil {
panic(fmt.Sprintf("unable to setup ServiceAccount controller: %v", err))
}
if err := AddServiceDiscoveryControllerToManager(testEnv.GetContext(), testEnv.Manager, controllerOpts); err != nil {
if err := AddServiceDiscoveryControllerToManager(testEnv.GetContext(), testEnv.Manager, tracker, controllerOpts); err != nil {
panic(fmt.Sprintf("unable to setup SvcDiscovery controller: %v", err))
}

Expand Down
10 changes: 5 additions & 5 deletions controllers/serviceaccount_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ const (
)

// AddServiceAccountProviderControllerToManager adds this controller to the provided manager.
func AddServiceAccountProviderControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, options controller.Options) error {
func AddServiceAccountProviderControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, tracker *remote.ClusterCacheTracker, options controller.Options) error {
var (
controlledType = &vmwarev1.ProviderServiceAccount{}
controlledTypeName = reflect.TypeOf(controlledType).Elem().Name()
Expand All @@ -82,8 +82,8 @@ func AddServiceAccountProviderControllerToManager(ctx *context.ControllerManager
Logger: ctx.Logger.WithName(controllerNameShort),
}
r := ServiceAccountReconciler{
ControllerContext: controllerContext,
remoteClientGetter: remote.NewClusterClient,
ControllerContext: controllerContext,
remoteClusterCacheTracker: tracker,
}

clusterToInfraFn := clusterToSupervisorInfrastructureMapFunc(ctx)
Expand Down Expand Up @@ -134,7 +134,7 @@ func clusterToSupervisorInfrastructureMapFunc(managerContext *context.Controller
type ServiceAccountReconciler struct {
*context.ControllerContext

remoteClientGetter remote.ClusterClientGetter
remoteClusterCacheTracker *remote.ClusterCacheTracker
}

func (r ServiceAccountReconciler) Reconcile(_ goctx.Context, req reconcile.Request) (_ reconcile.Result, reterr error) {
Expand Down Expand Up @@ -202,7 +202,7 @@ func (r ServiceAccountReconciler) Reconcile(_ goctx.Context, req reconcile.Reque
// then just return a no-op and wait for the next sync. This will occur when
// the Cluster's status is updated with a reference to the secret that has
// the Kubeconfig data used to access the target cluster.
guestClient, err := r.remoteClientGetter(clusterContext, ProviderServiceAccountControllerName, clusterContext.Client, client.ObjectKeyFromObject(cluster))
guestClient, err := r.remoteClusterCacheTracker.GetClient(clusterContext, client.ObjectKeyFromObject(cluster))
if err != nil {
clusterContext.Logger.Info("The control plane is not ready yet", "err", err)
return reconcile.Result{RequeueAfter: clusterNotReadyRequeueTime}, nil
Expand Down
10 changes: 5 additions & 5 deletions controllers/servicediscovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ const (
// +kubebuilder:rbac:groups="",resources=configmaps/status,verbs=get

// AddServiceDiscoveryControllerToManager adds the ServiceDiscovery controller to the provided manager.
func AddServiceDiscoveryControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, options controller.Options) error {
func AddServiceDiscoveryControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, tracker *remote.ClusterCacheTracker, options controller.Options) error {
var (
controllerNameShort = ServiceDiscoveryControllerName
controllerNameLong = fmt.Sprintf("%s/%s/%s", ctx.Namespace, ctx.Name, ServiceDiscoveryControllerName)
Expand All @@ -84,8 +84,8 @@ func AddServiceDiscoveryControllerToManager(ctx *context.ControllerManagerContex
Logger: ctx.Logger.WithName(controllerNameShort),
}
r := serviceDiscoveryReconciler{
ControllerContext: controllerContext,
remoteClientGetter: remote.NewClusterClient,
ControllerContext: controllerContext,
remoteClusterCacheTracker: tracker,
}

configMapCache, err := cache.New(mgr.GetConfig(), cache.Options{
Expand Down Expand Up @@ -128,7 +128,7 @@ func AddServiceDiscoveryControllerToManager(ctx *context.ControllerManagerContex
type serviceDiscoveryReconciler struct {
*context.ControllerContext

remoteClientGetter remote.ClusterClientGetter
remoteClusterCacheTracker *remote.ClusterCacheTracker
}

func (r serviceDiscoveryReconciler) Reconcile(_ goctx.Context, req reconcile.Request) (_ reconcile.Result, reterr error) {
Expand Down Expand Up @@ -189,7 +189,7 @@ func (r serviceDiscoveryReconciler) Reconcile(_ goctx.Context, req reconcile.Req

// We cannot proceed until we are able to access the target cluster. Until
// then just return a no-op and wait for the next sync.
guestClient, err := r.remoteClientGetter(clusterContext, ServiceDiscoveryControllerName, clusterContext.Client, client.ObjectKeyFromObject(cluster))
guestClient, err := r.remoteClusterCacheTracker.GetClient(clusterContext, client.ObjectKeyFromObject(cluster))
if err != nil {
logger.Info("The control plane is not ready yet", "err", err)
return reconcile.Result{RequeueAfter: clusterNotReadyRequeueTime}, nil
Expand Down
12 changes: 7 additions & 5 deletions controllers/vspherevm_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ import (
// AddVMControllerToManager adds the VM controller to the provided manager.
//
//nolint:forcetypeassert
func AddVMControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, options controller.Options) error {
func AddVMControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, tracker *remote.ClusterCacheTracker, options controller.Options) error {
var (
controlledType = &infrav1.VSphereVM{}
controlledTypeName = reflect.TypeOf(controlledType).Elem().Name()
Expand All @@ -88,8 +88,9 @@ func AddVMControllerToManager(ctx *context.ControllerManagerContext, mgr manager
Logger: ctx.Logger.WithName(controllerNameShort),
}
r := vmReconciler{
ControllerContext: controllerContext,
VMService: &govmomi.VMService{},
ControllerContext: controllerContext,
VMService: &govmomi.VMService{},
remoteClusterCacheTracker: tracker,
}
controller, err := ctrl.NewControllerManagedBy(mgr).
// Watch the controlled, infrastructure resource.
Expand Down Expand Up @@ -158,7 +159,8 @@ func AddVMControllerToManager(ctx *context.ControllerManagerContext, mgr manager
type vmReconciler struct {
*context.ControllerContext

VMService services.VirtualMachineService
VMService services.VirtualMachineService
remoteClusterCacheTracker *remote.ClusterCacheTracker
}

// Reconcile ensures the back-end state reflects the Kubernetes resource state intent.
Expand Down Expand Up @@ -368,7 +370,7 @@ func (r vmReconciler) deleteNode(ctx *context.VMContext, name string) error {
if err != nil {
return err
}
clusterClient, err := remote.NewClusterClient(ctx, r.ControllerContext.Name, r.Client, ctrlclient.ObjectKeyFromObject(cluster))
clusterClient, err := r.remoteClusterCacheTracker.GetClient(ctx, ctrlclient.ObjectKeyFromObject(cluster))
if err != nil {
return err
}
Expand Down
16 changes: 14 additions & 2 deletions controllers/vspherevm_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apirecord "k8s.io/client-go/tools/record"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/remote"
ipamv1 "sigs.k8s.io/cluster-api/exp/ipam/api/v1alpha1"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/conditions"
Expand Down Expand Up @@ -73,6 +74,16 @@ func TestReconcileNormal_WaitingForIPAddrAllocation(t *testing.T) {
}
defer simr.Destroy()

tracker, err := remote.NewClusterCacheTracker(
testEnv.Manager,
remote.ClusterCacheTrackerOptions{
Indexes: []remote.Index{remote.NodeProviderIDIndex},
},
)
if err != nil {
t.Fatalf("unable to setup ClusterCacheTracker: %v", err)
}

create := func(netSpec infrav1.NetworkSpec) func() {
return func() {
vsphereCluster = &infrav1.VSphereCluster{
Expand Down Expand Up @@ -177,8 +188,9 @@ func TestReconcileNormal_WaitingForIPAddrAllocation(t *testing.T) {
Logger: log.Log,
}
return vmReconciler{
ControllerContext: controllerContext,
VMService: vmService,
ControllerContext: controllerContext,
VMService: vmService,
remoteClusterCacheTracker: tracker,
}
}

Expand Down
61 changes: 54 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
goruntime "runtime"
"time"

perrors "github.com/pkg/errors"
"github.com/spf13/pflag"
"gopkg.in/fsnotify.v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -39,6 +40,7 @@ import (
"sigs.k8s.io/cluster-api/controllers/remote"
"sigs.k8s.io/cluster-api/util/flags"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
ctrlmgr "sigs.k8s.io/controller-runtime/pkg/manager"
ctrlsig "sigs.k8s.io/controller-runtime/pkg/manager/signals"
Expand Down Expand Up @@ -250,14 +252,20 @@ func main() {

// Create a function that adds all the controllers and webhooks to the manager.
addToManager := func(ctx *context.ControllerManagerContext, mgr ctrlmgr.Manager) error {
tracker, err := setupRemoteClusterCacheTracker(ctx, mgr)
if err != nil {
setupLog.Error(err, "unable to create remote cluster tracker tracker")
return err
}

// Check for non-supervisor VSphereCluster and start controller if found
gvr := v1beta1.GroupVersion.WithResource(reflect.TypeOf(&v1beta1.VSphereCluster{}).Elem().Name())
isLoaded, err := isCRDDeployed(mgr, gvr)
if err != nil {
return err
}
if isLoaded {
if err := setupVAPIControllers(ctx, mgr); err != nil {
if err := setupVAPIControllers(ctx, mgr, tracker); err != nil {
return fmt.Errorf("setupVAPIControllers: %w", err)
}
} else {
Expand All @@ -271,7 +279,7 @@ func main() {
return err
}
if isLoaded {
if err := setupSupervisorControllers(ctx, mgr); err != nil {
if err := setupSupervisorControllers(ctx, mgr, tracker); err != nil {
return fmt.Errorf("setupSupervisorControllers: %w", err)
}
} else {
Expand Down Expand Up @@ -318,7 +326,7 @@ func main() {
defer session.Clear()
}

func setupVAPIControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Manager) error {
func setupVAPIControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Manager, tracker *remote.ClusterCacheTracker) error {
if err := (&v1beta1.VSphereClusterTemplate{}).SetupWebhookWithManager(mgr); err != nil {
return err
}
Expand Down Expand Up @@ -349,7 +357,7 @@ func setupVAPIControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Man
if err := controllers.AddMachineControllerToManager(ctx, mgr, &v1beta1.VSphereMachine{}, concurrency(vSphereMachineConcurrency)); err != nil {
return err
}
if err := controllers.AddVMControllerToManager(ctx, mgr, concurrency(vSphereVMConcurrency)); err != nil {
if err := controllers.AddVMControllerToManager(ctx, mgr, tracker, concurrency(vSphereVMConcurrency)); err != nil {
return err
}
if err := controllers.AddVsphereClusterIdentityControllerToManager(ctx, mgr, concurrency(vSphereClusterIdentityConcurrency)); err != nil {
Expand All @@ -359,7 +367,7 @@ func setupVAPIControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Man
return controllers.AddVSphereDeploymentZoneControllerToManager(ctx, mgr, concurrency(vSphereDeploymentZoneConcurrency))
}

func setupSupervisorControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Manager) error {
func setupSupervisorControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Manager, tracker *remote.ClusterCacheTracker) error {
if err := controllers.AddClusterControllerToManager(ctx, mgr, &vmwarev1b1.VSphereCluster{}, concurrency(vSphereClusterConcurrency)); err != nil {
return err
}
Expand All @@ -368,11 +376,11 @@ func setupSupervisorControllers(ctx *context.ControllerManagerContext, mgr ctrlm
return err
}

if err := controllers.AddServiceAccountProviderControllerToManager(ctx, mgr, concurrency(providerServiceAccountConcurrency)); err != nil {
if err := controllers.AddServiceAccountProviderControllerToManager(ctx, mgr, tracker, concurrency(providerServiceAccountConcurrency)); err != nil {
return err
}

return controllers.AddServiceDiscoveryControllerToManager(ctx, mgr, concurrency(serviceDiscoveryConcurrency))
return controllers.AddServiceDiscoveryControllerToManager(ctx, mgr, tracker, concurrency(serviceDiscoveryConcurrency))
}

func setupChecks(mgr ctrlmgr.Manager) {
Expand Down Expand Up @@ -409,3 +417,42 @@ func isCRDDeployed(mgr ctrlmgr.Manager, gvr schema.GroupVersionResource) (bool,
func concurrency(c int) controller.Options {
return controller.Options{MaxConcurrentReconciles: c}
}

func setupRemoteClusterCacheTracker(ctx *context.ControllerManagerContext, mgr ctrlmgr.Manager) (*remote.ClusterCacheTracker, error) {
secretCachingClient, err := client.New(mgr.GetConfig(), client.Options{
HTTPClient: mgr.GetHTTPClient(),
Cache: &client.CacheOptions{
Reader: mgr.GetCache(),
},
})
if err != nil {
return nil, perrors.Wrapf(err, "unable to create secret caching client")
}

// Set up a ClusterCacheTracker and ClusterCacheReconciler to provide to controllers
// requiring a connection to a remote cluster
log := ctrl.Log.WithName("remote").WithName("ClusterCacheTracker")
tracker, err := remote.NewClusterCacheTracker(
mgr,
remote.ClusterCacheTrackerOptions{
SecretCachingClient: secretCachingClient,
ControllerName: controllerName,
Log: &log,
Indexes: []remote.Index{remote.NodeProviderIDIndex},
},
)
if err != nil {
return nil, perrors.Wrapf(err, "unable to create cluster cache tracker")
}

if err := (&remote.ClusterCacheReconciler{
Client: mgr.GetClient(),
Tracker: tracker,
WatchFilterValue: managerOpts.WatchFilterValue,
}).SetupWithManager(ctx, mgr, concurrency(vSphereClusterConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterCacheReconciler")
return nil, err
}

return tracker, nil
}

0 comments on commit 124988a

Please sign in to comment.