diff --git a/pkg/reconciler/workload/basecontroller/controller.go b/pkg/reconciler/workload/basecontroller/controller.go deleted file mode 100644 index 00e8d32bd14..00000000000 --- a/pkg/reconciler/workload/basecontroller/controller.go +++ /dev/null @@ -1,276 +0,0 @@ -/* -Copyright 2021 The KCP Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package basecontroller - -import ( - "context" - "fmt" - "time" - - kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" - "github.com/kcp-dev/logicalcluster/v3" - - "k8s.io/apimachinery/pkg/api/equality" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" - "k8s.io/klog/v2" - - apiresourcev1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apiresource/v1alpha1" - workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1" - kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster" - apiresourcev1alpha1informers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions/apiresource/v1alpha1" - workloadv1alpha1informers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions/workload/v1alpha1" - "github.com/kcp-dev/kcp/pkg/logging" -) - -const LocationInLogicalClusterIndexName = "LocationInLogicalCluster" - -func GetLocationInLogicalClusterIndexKey(location string, clusterName logicalcluster.Path) string { - return location + "/" + clusterName.String() -} - -// ClusterReconcileImpl defines the methods that ClusterReconciler -// will call in response to changes to Cluster resources. -type ClusterReconcileImpl interface { - Reconcile(ctx context.Context, cluster *workloadv1alpha1.SyncTarget) error - Cleanup(ctx context.Context, deletedCluster *workloadv1alpha1.SyncTarget) -} - -type ClusterQueue interface { - EnqueueAfter(*workloadv1alpha1.SyncTarget, time.Duration) -} - -// NewClusterReconciler returns a new controller which reconciles -// Cluster resources in the API server it reaches using the REST -// client. -func NewClusterReconciler( - name string, - reconciler ClusterReconcileImpl, - kcpClusterClient kcpclientset.ClusterInterface, - clusterInformer workloadv1alpha1informers.SyncTargetClusterInformer, - apiResourceImportInformer apiresourcev1alpha1informers.APIResourceImportClusterInformer, -) (*ClusterReconciler, ClusterQueue, error) { - queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name) - - c := &ClusterReconciler{ - name: name, - reconciler: reconciler, - kcpClusterClient: kcpClusterClient, - clusterIndexer: clusterInformer.Informer().GetIndexer(), - apiresourceImportIndexer: apiResourceImportInformer.Informer().GetIndexer(), - queue: queue, - } - - clusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { c.enqueue(obj) }, - UpdateFunc: func(_, obj interface{}) { c.enqueue(obj) }, - DeleteFunc: func(obj interface{}) { c.deletedCluster(obj) }, - }) - apiResourceImportInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - UpdateFunc: func(_, obj interface{}) { - c.enqueueAPIResourceImportRelatedCluster(obj) - }, - DeleteFunc: func(obj interface{}) { - c.enqueueAPIResourceImportRelatedCluster(obj) - }, - }) - - indexers := map[string]cache.IndexFunc{ - LocationInLogicalClusterIndexName: func(obj interface{}) ([]string, error) { - if apiResourceImport, ok := obj.(*apiresourcev1alpha1.APIResourceImport); ok { - return []string{GetLocationInLogicalClusterIndexKey(apiResourceImport.Spec.Location, logicalcluster.From(apiResourceImport).Path())}, nil - } - return []string{}, nil - }, - } - - // Ensure the indexers are only added if not already present. - for indexName := range c.apiresourceImportIndexer.GetIndexers() { - delete(indexers, indexName) - } - if len(indexers) > 0 { - if err := c.apiresourceImportIndexer.AddIndexers(indexers); err != nil { - return nil, nil, fmt.Errorf("failed to add indexer for APIResourceImport: %w", err) - } - } - - return c, queueAdapter{queue}, nil -} - -type queueAdapter struct { - queue interface { - AddAfter(item interface{}, duration time.Duration) - } -} - -func (a queueAdapter) EnqueueAfter(cl *workloadv1alpha1.SyncTarget, dur time.Duration) { - key, err := kcpcache.MetaClusterNamespaceKeyFunc(cl) - if err != nil { - runtime.HandleError(err) - return - } - a.queue.AddAfter(key, dur) -} - -type ClusterReconciler struct { - name string - reconciler ClusterReconcileImpl - kcpClusterClient kcpclientset.ClusterInterface - clusterIndexer cache.Indexer - apiresourceImportIndexer cache.Indexer - - queue workqueue.RateLimitingInterface -} - -func (c *ClusterReconciler) enqueue(obj interface{}) { - key, err := kcpcache.MetaClusterNamespaceKeyFunc(obj) - if err != nil { - runtime.HandleError(err) - return - } - - logger := logging.WithQueueKey(logging.WithReconciler(klog.Background(), c.name), key) - logger.V(2).Info("queueing SyncTarget") - c.queue.Add(key) -} - -func (c *ClusterReconciler) enqueueAPIResourceImportRelatedCluster(obj interface{}) { - var apiResourceImport *apiresourcev1alpha1.APIResourceImport - switch typedObj := obj.(type) { - case *apiresourcev1alpha1.APIResourceImport: - apiResourceImport = typedObj - case cache.DeletedFinalStateUnknown: - deletedImport, ok := typedObj.Obj.(*apiresourcev1alpha1.APIResourceImport) - if ok { - apiResourceImport = deletedImport - } - } - if apiResourceImport != nil { - c.enqueue(&metav1.PartialObjectMetadata{ - ObjectMeta: metav1.ObjectMeta{ - Name: apiResourceImport.Spec.Location, - // TODO: (shawn-hurley) - Annotations: map[string]string{ - logicalcluster.AnnotationKey: logicalcluster.From(apiResourceImport).String(), - }, - }, - }) - } -} - -func (c *ClusterReconciler) startWorker(ctx context.Context) { - for c.processNextWorkItem(ctx) { - } -} - -func (c *ClusterReconciler) Start(ctx context.Context) { - defer runtime.HandleCrash() - defer c.queue.ShutDown() - - logger := logging.WithReconciler(klog.FromContext(ctx), c.name) - ctx = klog.NewContext(ctx, logger) - logger.Info("Starting controller") - defer logger.Info("Shutting down controller") - - go wait.Until(func() { c.startWorker(ctx) }, time.Millisecond*10, ctx.Done()) - - <-ctx.Done() -} - -func (c *ClusterReconciler) ShutDown() { - c.queue.ShutDown() -} - -func (c *ClusterReconciler) processNextWorkItem(ctx context.Context) bool { - // Wait until there is a new item in the working queue - k, quit := c.queue.Get() - if quit { - return false - } - key := k.(string) - - logger := logging.WithQueueKey(klog.FromContext(ctx), key) - ctx = klog.NewContext(ctx, logger) - logger.V(1).Info("processing key") - - // No matter what, tell the queue we're done with this key, to unblock - // other workers. - defer c.queue.Done(key) - - if err := c.process(ctx, key); err != nil { - runtime.HandleError(fmt.Errorf("%s: failed to sync %q, err: %w", c.name, key, err)) - c.queue.AddRateLimited(key) - return true - } - c.queue.Forget(key) - return true -} - -func (c *ClusterReconciler) process(ctx context.Context, key string) error { - logger := klog.FromContext(ctx) - obj, exists, err := c.clusterIndexer.GetByKey(key) - if err != nil { - return err - } - - if !exists { - logger.Info("object for key was deleted") - return nil - } - current := obj.(*workloadv1alpha1.SyncTarget).DeepCopy() - previous := current.DeepCopy() - - logger = logging.WithObject(logger, previous) - ctx = klog.NewContext(ctx, logger) - - if err := c.reconciler.Reconcile(ctx, current); err != nil { - return err - } - - // If the object being reconciled changed as a result, update it. - if !equality.Semantic.DeepEqual(previous.Status, current.Status) { - _, uerr := c.kcpClusterClient.Cluster(logicalcluster.From(current).Path()).WorkloadV1alpha1().SyncTargets().UpdateStatus(ctx, current, metav1.UpdateOptions{}) - return uerr - } - - return nil -} - -func (c *ClusterReconciler) deletedCluster(obj interface{}) { - logger := logging.WithReconciler(klog.Background(), c.name) - - castObj, ok := obj.(*workloadv1alpha1.SyncTarget) - if !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - logger.Error(fmt.Errorf("unexpected tombstone %T, not %T", obj, cache.DeletedFinalStateUnknown{}), "couldn't get object from tombstone") - return - } - castObj, ok = tombstone.Obj.(*workloadv1alpha1.SyncTarget) - if !ok { - logger.Error(fmt.Errorf("unexpected tombstone %T, not %T", obj, &workloadv1alpha1.SyncTarget{}), "couldn't get object from tombstone") - return - } - } - logger = logging.WithObject(logger, castObj) - ctx := klog.NewContext(context.TODO(), logger) - logger.V(4).Info("responding to deletion of SyncTarget") - c.reconciler.Cleanup(ctx, castObj) -} diff --git a/pkg/reconciler/workload/heartbeat/heartbeat_controller.go b/pkg/reconciler/workload/heartbeat/heartbeat_controller.go index c32d726a0ea..6070b17931b 100644 --- a/pkg/reconciler/workload/heartbeat/heartbeat_controller.go +++ b/pkg/reconciler/workload/heartbeat/heartbeat_controller.go @@ -17,36 +17,160 @@ limitations under the License. package heartbeat import ( + "context" + "fmt" "time" + kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" + "github.com/kcp-dev/logicalcluster/v3" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1" kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster" - apiresourcev1alpha1informers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions/apiresource/v1alpha1" + workloadv1alpha1client "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/typed/workload/v1alpha1" workloadv1alpha1informers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions/workload/v1alpha1" - "github.com/kcp-dev/kcp/pkg/reconciler/workload/basecontroller" + "github.com/kcp-dev/kcp/pkg/logging" + "github.com/kcp-dev/kcp/pkg/reconciler/committer" ) -const ControllerName = "kcp-cluster-heartbeat" +const ControllerName = "kcp-synctarget-heartbeat" + +type Controller struct { + queue workqueue.RateLimitingInterface + kcpClusterClient kcpclientset.ClusterInterface + heartbeatThreshold time.Duration + commit CommitFunc + getSyncTarget func(clusterName logicalcluster.Name, name string) (*workloadv1alpha1.SyncTarget, error) +} func NewController( kcpClusterClient kcpclientset.ClusterInterface, - clusterInformer workloadv1alpha1informers.SyncTargetClusterInformer, - apiResourceImportInformer apiresourcev1alpha1informers.APIResourceImportClusterInformer, + syncTargetInformer workloadv1alpha1informers.SyncTargetClusterInformer, heartbeatThreshold time.Duration, -) (*basecontroller.ClusterReconciler, error) { - cm := &clusterManager{ +) (*Controller, error) { + queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName) + + c := &Controller{ + queue: queue, + kcpClusterClient: kcpClusterClient, heartbeatThreshold: heartbeatThreshold, + commit: committer.NewCommitter[*SyncTarget, Patcher, *SyncTargetSpec, *SyncTargetStatus](kcpClusterClient.WorkloadV1alpha1().SyncTargets()), + getSyncTarget: func(clusterName logicalcluster.Name, name string) (*workloadv1alpha1.SyncTarget, error) { + return syncTargetInformer.Cluster(clusterName).Lister().Get(name) + }, + } + + syncTargetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { c.enqueue(obj) }, + UpdateFunc: func(_, obj interface{}) { c.enqueue(obj) }, + }) + + return c, nil +} + +type SyncTarget = workloadv1alpha1.SyncTarget +type SyncTargetSpec = workloadv1alpha1.SyncTargetSpec +type SyncTargetStatus = workloadv1alpha1.SyncTargetStatus +type Patcher = workloadv1alpha1client.SyncTargetInterface +type Resource = committer.Resource[*SyncTargetSpec, *SyncTargetStatus] +type CommitFunc = func(context.Context, *Resource, *Resource) error + +func (c *Controller) enqueue(obj interface{}) { + key, err := kcpcache.MetaClusterNamespaceKeyFunc(obj) + if err != nil { + runtime.HandleError(err) + return + } + + logger := logging.WithQueueKey(logging.WithReconciler(klog.Background(), ControllerName), key) + logger.V(2).Info("queueing SyncTarget") + c.queue.Add(key) +} + +func (c *Controller) Start(ctx context.Context) { + defer runtime.HandleCrash() + defer c.queue.ShutDown() + + logger := logging.WithReconciler(klog.FromContext(ctx), ControllerName) + ctx = klog.NewContext(ctx, logger) + logger.Info("Starting controller") + defer logger.Info("Shutting down controller") + + go wait.UntilWithContext(ctx, c.startWorker, time.Second) + + <-ctx.Done() +} + +func (c *Controller) startWorker(ctx context.Context) { + for c.processNextWorkItem(ctx) { + } +} + +func (c *Controller) processNextWorkItem(ctx context.Context) bool { + // Wait until there is a new item in the working queue + k, quit := c.queue.Get() + if quit { + return false + } + key := k.(string) + + logger := logging.WithQueueKey(klog.FromContext(ctx), key) + ctx = klog.NewContext(ctx, logger) + logger.V(1).Info("processing key") + + // No matter what, tell the queue we're done with this key, to unblock + // other workers. + defer c.queue.Done(key) + + if err := c.process(ctx, key); err != nil { + runtime.HandleError(fmt.Errorf("%s: failed to sync %q, err: %w", ControllerName, key, err)) + c.queue.AddRateLimited(key) + return true + } + c.queue.Forget(key) + return true +} + +func (c *Controller) process(ctx context.Context, key string) error { + logger := klog.FromContext(ctx) + clusterName, _, name, err := kcpcache.SplitMetaClusterNamespaceKey(key) + if err != nil { + logger.Error(err, "error parsing key") + return nil } - r, queue, err := basecontroller.NewClusterReconciler( - ControllerName, - cm, - kcpClusterClient, - clusterInformer, - apiResourceImportInformer, - ) + current, err := c.getSyncTarget(clusterName, name) if err != nil { - return nil, err + if !apierrors.IsNotFound(err) { + logger.Error(err, "failed to get SyncTarget from lister", "cluster", clusterName, "name", name) + } + + return nil + } + + previous := current + current = current.DeepCopy() + + logger = logging.WithObject(logger, previous) + ctx = klog.NewContext(ctx, logger) + + var errs []error + if err := c.reconcile(ctx, key, current); err != nil { + errs = append(errs, err) } - cm.enqueueClusterAfter = queue.EnqueueAfter - return r, nil + + oldResource := &Resource{ObjectMeta: previous.ObjectMeta, Spec: &previous.Spec, Status: &previous.Status} + newResource := &Resource{ObjectMeta: current.ObjectMeta, Spec: ¤t.Spec, Status: ¤t.Status} + if err := c.commit(ctx, oldResource, newResource); err != nil { + errs = append(errs, err) + } + + return utilerrors.NewAggregate(errs) } diff --git a/pkg/reconciler/workload/heartbeat/heartbeat_manager.go b/pkg/reconciler/workload/heartbeat/heartbeat_reconciler.go similarity index 81% rename from pkg/reconciler/workload/heartbeat/heartbeat_manager.go rename to pkg/reconciler/workload/heartbeat/heartbeat_reconciler.go index 8f53fcbbd39..21050d2fe95 100644 --- a/pkg/reconciler/workload/heartbeat/heartbeat_manager.go +++ b/pkg/reconciler/workload/heartbeat/heartbeat_reconciler.go @@ -25,17 +25,9 @@ import ( conditionsv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/third_party/conditions/apis/conditions/v1alpha1" "github.com/kcp-dev/kcp/pkg/apis/third_party/conditions/util/conditions" workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1" - "github.com/kcp-dev/kcp/pkg/reconciler/workload/basecontroller" ) -var _ basecontroller.ClusterReconcileImpl = (*clusterManager)(nil) - -type clusterManager struct { - heartbeatThreshold time.Duration - enqueueClusterAfter func(*workloadv1alpha1.SyncTarget, time.Duration) -} - -func (c *clusterManager) Reconcile(ctx context.Context, cluster *workloadv1alpha1.SyncTarget) error { +func (c *Controller) reconcile(ctx context.Context, key string, cluster *workloadv1alpha1.SyncTarget) error { logger := klog.FromContext(ctx) defer conditions.SetSummary( cluster, @@ -70,11 +62,8 @@ func (c *clusterManager) Reconcile(ctx context.Context, cluster *workloadv1alpha // Enqueue another check after which the heartbeat should have been updated again. dur := time.Until(latestHeartbeat.Add(c.heartbeatThreshold)) - c.enqueueClusterAfter(cluster, dur) + c.queue.AddAfter(key, dur) } return nil } - -func (c *clusterManager) Cleanup(ctx context.Context, deletedCluster *workloadv1alpha1.SyncTarget) { -} diff --git a/pkg/reconciler/workload/heartbeat/heartbeat_manager_test.go b/pkg/reconciler/workload/heartbeat/heartbeat_reconciler_test.go similarity index 58% rename from pkg/reconciler/workload/heartbeat/heartbeat_manager_test.go rename to pkg/reconciler/workload/heartbeat/heartbeat_reconciler_test.go index 069f2e43bc0..4e04739ac30 100644 --- a/pkg/reconciler/workload/heartbeat/heartbeat_manager_test.go +++ b/pkg/reconciler/workload/heartbeat/heartbeat_reconciler_test.go @@ -23,13 +23,25 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/workqueue" conditionsv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/third_party/conditions/apis/conditions/v1alpha1" workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1" ) -func TestManager(t *testing.T) { - for _, c := range []struct { +type fakeDelayingQueue struct { + workqueue.RateLimitingInterface + duration time.Duration +} + +var _ workqueue.DelayingInterface = (*fakeDelayingQueue)(nil) + +func (f *fakeDelayingQueue) AddAfter(obj interface{}, duration time.Duration) { + f.duration = duration +} + +func TestReconcile(t *testing.T) { + for _, tc := range []struct { desc string lastHeartbeatTime time.Time wantDur time.Duration @@ -47,18 +59,17 @@ func TestManager(t *testing.T) { lastHeartbeatTime: time.Now().Add(-90 * time.Second), wantReady: false, }} { - t.Run(c.desc, func(t *testing.T) { - var enqueued time.Duration - enqueueFunc := func(_ *workloadv1alpha1.SyncTarget, dur time.Duration) { - enqueued = dur + t.Run(tc.desc, func(t *testing.T) { + queue := &fakeDelayingQueue{ + RateLimitingInterface: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "testing"), } - mgr := clusterManager{ - heartbeatThreshold: time.Minute, - enqueueClusterAfter: enqueueFunc, + c := &Controller{ + queue: queue, + heartbeatThreshold: time.Minute, } ctx := context.Background() - heartbeat := metav1.NewTime(c.lastHeartbeatTime) - cl := &workloadv1alpha1.SyncTarget{ + heartbeat := metav1.NewTime(tc.lastHeartbeatTime) + syncTarget := &workloadv1alpha1.SyncTarget{ Status: workloadv1alpha1.SyncTargetStatus{ Conditions: []conditionsv1alpha1.Condition{{ Type: workloadv1alpha1.HeartbeatHealthy, @@ -67,18 +78,18 @@ func TestManager(t *testing.T) { LastSyncerHeartbeatTime: &heartbeat, }, } - if err := mgr.Reconcile(ctx, cl); err != nil { - t.Fatalf("Reconcile: %v", err) + if err := c.reconcile(ctx, "somekey", syncTarget); err != nil { + t.Fatalf("reconcile: %v", err) } - // actual enqueued time must not be more than 30s off from desired enqueue time. + // actual enqueued time must not be more than 30ms off from desired enqueue time. delta := 30 * time.Millisecond - if c.wantDur-delta > enqueued { - t.Errorf("next enqueue time; got %s, want %s", enqueued, c.wantDur) + if tc.wantDur-delta > queue.duration { + t.Errorf("next enqueue time; got %s, want %s", queue.duration, tc.wantDur) } - isReady := cl.GetConditions()[0].Status == corev1.ConditionTrue - if isReady != c.wantReady { - t.Errorf("cluster Ready; got %t, want %t", isReady, c.wantReady) + isReady := syncTarget.GetConditions()[0].Status == corev1.ConditionTrue + if isReady != tc.wantReady { + t.Errorf("SyncTarget Ready; got %t, want %t", isReady, tc.wantReady) } // TODO: check wantReady. }) diff --git a/tmc/pkg/server/controllers.go b/tmc/pkg/server/controllers.go index 166cc15e60e..d8bc078e455 100644 --- a/tmc/pkg/server/controllers.go +++ b/tmc/pkg/server/controllers.go @@ -127,7 +127,6 @@ func (s *Server) installSyncTargetHeartbeatController(ctx context.Context, confi c, err := heartbeat.NewController( kcpClusterClient, s.Core.KcpSharedInformerFactory.Workload().V1alpha1().SyncTargets(), - s.Core.KcpSharedInformerFactory.Apiresource().V1alpha1().APIResourceImports(), s.Options.Controllers.SyncTargetHeartbeat.HeartbeatThreshold, ) if err != nil {