Skip to content

Commit

Permalink
Merge pull request #2214 from bipuladh/1980-storage-upsync-controller…
Browse files Browse the repository at this point in the history
…-syncer

✨ Add Upsync controller
  • Loading branch information
openshift-merge-robot authored Feb 16, 2023
2 parents 02cfa8e + 5fc0cd7 commit ed5fced
Show file tree
Hide file tree
Showing 8 changed files with 1,748 additions and 22 deletions.
20 changes: 14 additions & 6 deletions pkg/syncer/spec/spec_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1"
"github.com/kcp-dev/kcp/pkg/indexers"
ddsif "github.com/kcp-dev/kcp/pkg/informer"
"github.com/kcp-dev/kcp/pkg/logging"
Expand Down Expand Up @@ -175,6 +176,18 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste
if gvr == namespaceGVR {
return
}
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = d.Obj
}
unstrObj, ok := obj.(*unstructured.Unstructured)
if !ok {
utilruntime.HandleError(fmt.Errorf("resource should be a *unstructured.Unstructured, but was %T", unstrObj))
return
}
if unstrObj.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncTargetKey] == string(workloadv1alpha1.ResourceStateUpsync) {
return
}

key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error getting key for type %T: %w", obj, err))
Expand All @@ -188,7 +201,6 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste
logger.V(3).Info("processing delete event")

var nsLocatorHolder *unstructured.Unstructured
var ok bool
// Handle namespaced resources
if namespace != "" {
// Use namespace lister
Expand All @@ -214,11 +226,7 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste
}
} else {
// The nsLocatorHolder is in the resource itself for cluster-scoped resources.
nsLocatorHolder, ok = obj.(*unstructured.Unstructured)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
return
}
nsLocatorHolder = unstrObj
}
logger = logging.WithObject(logger, nsLocatorHolder)

Expand Down
26 changes: 25 additions & 1 deletion pkg/syncer/status/status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1"
ddsif "github.com/kcp-dev/kcp/pkg/informer"
"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/syncer/shared"
Expand Down Expand Up @@ -116,6 +117,15 @@ func NewStatusSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalclus
if gvr == namespaceGVR {
return
}
unstrObj, ok := obj.(*unstructured.Unstructured)
if !ok {
runtime.HandleError(fmt.Errorf("resource should be a *unstructured.Unstructured, but was %T", unstrObj))
return
}
if unstrObj.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncTargetKey] == string(workloadv1alpha1.ResourceStateUpsync) {
return
}

c.AddToQueue(gvr, obj, logger)
},
UpdateFunc: func(gvr schema.GroupVersionResource, oldObj, newObj interface{}) {
Expand All @@ -124,7 +134,9 @@ func NewStatusSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalclus
}
oldUnstrob := oldObj.(*unstructured.Unstructured)
newUnstrob := newObj.(*unstructured.Unstructured)

if newUnstrob.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncTargetKey] == string(workloadv1alpha1.ResourceStateUpsync) {
return
}
if !deepEqualFinalizersAndStatus(oldUnstrob, newUnstrob) {
c.AddToQueue(gvr, newUnstrob, logger)
}
Expand All @@ -133,6 +145,18 @@ func NewStatusSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalclus
if gvr == namespaceGVR {
return
}
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = d.Obj
}
unstrObj, ok := obj.(*unstructured.Unstructured)
if !ok {
runtime.HandleError(fmt.Errorf("resource should be a *unstructured.Unstructured, but was %T", unstrObj))
return
}
if unstrObj.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncTargetKey] == string(workloadv1alpha1.ResourceStateUpsync) {
return
}

c.AddToQueue(gvr, obj, logger)
},
})
Expand Down
5 changes: 5 additions & 0 deletions pkg/syncer/status/status_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ func (c *Controller) process(ctx context.Context, gvr schema.GroupVersionResourc
if !ok {
return fmt.Errorf("object to synchronize is expected to be Unstructured, but is %T", obj)
}
if u.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+c.syncTargetKey] == string(workloadv1alpha1.ResourceStateUpsync) {
logger.V(4).Info("do not update the status in upstream, since the downstream resource is in Upsync mode")
return nil
}

return c.updateStatusInUpstream(ctx, gvr, upstreamLister, upstreamNamespace, upstreamName, upstreamClusterName, u)
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/kcp-dev/kcp/pkg/syncer/resourcesync"
"github.com/kcp-dev/kcp/pkg/syncer/spec"
"github.com/kcp-dev/kcp/pkg/syncer/status"
"github.com/kcp-dev/kcp/pkg/syncer/upsync"
. "github.com/kcp-dev/kcp/tmc/pkg/logging"
)

Expand Down Expand Up @@ -292,6 +293,12 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i
return err
}

logger.Info("Creating resource upsyncer")
upSyncer, err := upsync.NewUpSyncer(logger, logicalcluster.From(syncTarget), cfg.SyncTargetName, syncTargetKey, upstreamUpsyncerClusterClient, downstreamDynamicClient, ddsifForUpstreamUpsyncer, ddsifForDownstream, syncTarget.GetUID())
if err != nil {
return err
}

// Start and sync informer factories
var cacheSyncsForAlwaysRequiredGVRs []cache.InformerSynced
for _, alwaysRequired := range []string{"secrets", "namespaces"} {
Expand Down Expand Up @@ -327,7 +334,7 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i
go syncTargetGVRSource.Start(ctx, 1)
go specSyncer.Start(ctx, numSyncerThreads)
go statusSyncer.Start(ctx, numSyncerThreads)

go upSyncer.Start(ctx, numSyncerThreads)
go downstreamNamespaceController.Start(ctx, numSyncerThreads)

// Create and start GVR-specific controllers through controller managers
Expand Down
Loading

0 comments on commit ed5fced

Please sign in to comment.