Skip to content

Commit

Permalink
resolve comments and fix e2e
Browse files Browse the repository at this point in the history
Signed-off-by: Jian Qiu <jqiu@redhat.com>
  • Loading branch information
qiujian16 committed Sep 19, 2022
1 parent 5e8946b commit 75ca22e
Show file tree
Hide file tree
Showing 12 changed files with 479 additions and 408 deletions.
193 changes: 84 additions & 109 deletions hack/logcheck.out

Large diffs are not rendered by default.

73 changes: 34 additions & 39 deletions pkg/syncer/apiimporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ import (
workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1"
kcpclient "github.com/kcp-dev/kcp/pkg/client/clientset/versioned"
kcpinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions"
workloadlisters "github.com/kcp-dev/kcp/pkg/client/listers/workload/v1alpha1"
"github.com/kcp-dev/kcp/pkg/crdpuller"
"github.com/kcp-dev/kcp/pkg/logging"
clusterctl "github.com/kcp-dev/kcp/pkg/reconciler/workload/basecontroller"
)

Expand Down Expand Up @@ -71,7 +73,6 @@ func NewAPIImporter(
return nil, err
}

clusterIndexer := kcpInformerFactory.Workload().V1alpha1().SyncTargets().Informer().GetIndexer()
importIndexer := kcpInformerFactory.Apiresource().V1alpha1().APIResourceImports().Informer().GetIndexer()

indexers := map[string]cache.IndexFunc{
Expand Down Expand Up @@ -109,7 +110,7 @@ func NewAPIImporter(
kcpClusterClient: kcpClusterClient,
resourcesToSync: resourcesToSync,
apiresourceImportIndexer: importIndexer,
clusterIndexer: clusterIndexer,
syncTargetLister: kcpInformerFactory.Workload().V1alpha1().SyncTargets().Lister(),

location: location,
logicalClusterName: logicalClusterName,
Expand All @@ -122,7 +123,7 @@ type APIImporter struct {
kcpClusterClient *kcpclient.Cluster
resourcesToSync []string
apiresourceImportIndexer cache.Indexer
clusterIndexer cache.Indexer
syncTargetLister workloadlisters.SyncTargetLister

location string
logicalClusterName logicalcluster.Name
Expand All @@ -133,40 +134,43 @@ type APIImporter struct {
func (i *APIImporter) Start(ctx context.Context, pollInterval time.Duration) {
defer runtime.HandleCrash()

i.kcpInformerFactory.WaitForCacheSync(ctx.Done())
logger := logging.WithReconciler(klog.FromContext(ctx), "kcp-syncer-apiimporter")
ctx = klog.NewContext(ctx, logger)

klog.Infof("Starting API Importer for location %s in cluster %s", i.location, i.logicalClusterName)
logger.Info("Starting API Importer", "location", i.location, "cluster", i.logicalClusterName)

clusterContext := request.WithCluster(ctx, request.Cluster{Name: i.logicalClusterName})
go wait.UntilWithContext(clusterContext, func(innerCtx context.Context) {
i.ImportAPIs(innerCtx)
}, pollInterval)

<-ctx.Done()
i.Stop()
i.Stop(ctx)
}

func (i *APIImporter) Stop() {
klog.Infof("Stopping API Importer for location %s in cluster %s", i.location, i.logicalClusterName)
func (i *APIImporter) Stop(ctx context.Context) {
logger := klog.FromContext(ctx)
logger.Info("Stopping API Importer", "location", i.location, "cluster", i.logicalClusterName)

objs, err := i.apiresourceImportIndexer.ByIndex(
clusterctl.LocationInLogicalClusterIndexName,
clusterctl.GetLocationInLogicalClusterIndexKey(i.location, i.logicalClusterName),
)
if err != nil {
klog.Errorf("error trying to list APIResourceImport objects for location %s in logical cluster %s: %v", i.location, i.logicalClusterName, err)
logger.Error(err, "error trying to list APIResourceImport objects", "location", i.location, "cluster", i.logicalClusterName)
}
for _, obj := range objs {
apiResourceImportToDelete := obj.(*apiresourcev1alpha1.APIResourceImport)
err := i.kcpClusterClient.Cluster(i.logicalClusterName).ApiresourceV1alpha1().APIResourceImports().Delete(request.WithCluster(context.Background(), request.Cluster{Name: i.logicalClusterName}), apiResourceImportToDelete.Name, metav1.DeleteOptions{})
if err != nil {
klog.Errorf("error deleting APIResourceImport %s: %v", apiResourceImportToDelete.Name, err)
logger.Error(err, "error deleting APIResourceImport", "apiResourceImport", apiResourceImportToDelete.Name)
}
}
}

func (i *APIImporter) ImportAPIs(ctx context.Context) {
clusterKey, err := kcpcache.MetaClusterNamespaceKeyFunc(&metav1.PartialObjectMetadata{
logger := klog.FromContext(ctx)
syncTargetKey, err := kcpcache.MetaClusterNamespaceKeyFunc(&metav1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{
Name: i.location,
Annotations: map[string]string{
Expand All @@ -175,35 +179,26 @@ func (i *APIImporter) ImportAPIs(ctx context.Context) {
},
})
if err != nil {
klog.Errorf("error creating APIResourceImport: %v", err)
logger.Error(err, "error building clusterKey")
return
}
clusterObj, exists, err := i.clusterIndexer.GetByKey(clusterKey)
syncTarget, err := i.syncTargetLister.Get(syncTargetKey)
if err != nil {
klog.Errorf("error creating APIResourceImport: %v", err)
return
}
if !exists {
klog.Errorf("the cluster object should exist in the index for location %s in logical cluster %s", i.location, i.logicalClusterName)
return
}
cluster, isCluster := clusterObj.(*workloadv1alpha1.SyncTarget)
if !isCluster {
klog.Errorf("the object retrieved from the cluster index for location %s in logical cluster %s should be a cluster object, but is of type: %T", i.location, i.logicalClusterName, clusterObj)
logger.Error(err, "error getting syncTarget")
return
}

// merge resourceToSync from synctarget with resourcesToSync set on the syncer's flag.
resourceToSyncSet := sets.NewString(i.resourcesToSync...)
for _, rs := range cluster.Status.SyncedResources {
for _, rs := range syncTarget.Status.SyncedResources {
resourceToSyncSet.Insert(fmt.Sprintf("%s.%s", rs.Resource, rs.Group))
}
resourcesToSync := resourceToSyncSet.List()

klog.Infof("Importing APIs from location %s in logical cluster %s (resources=%v)", i.location, i.logicalClusterName, resourcesToSync)
logger.Info("Importing APIs", "location", i.location, "cluster", i.logicalClusterName, "resources", resourcesToSync)
crds, err := i.schemaPuller.PullCRDs(ctx, resourcesToSync...)
if err != nil {
klog.Errorf("error pulling CRDs: %v", err)
logger.Error(err, "error pulling CRDs")
return
}

Expand All @@ -221,22 +216,22 @@ func (i *APIImporter) ImportAPIs(ctx context.Context) {
clusterctl.GetGVRForLocationInLogicalClusterIndexKey(i.location, i.logicalClusterName, gvr),
)
if err != nil {
klog.Errorf("error pulling CRDs: %v", err)
logger.Error(err, "error pulling CRDs")
continue
}
if len(objs) > 1 {
klog.Errorf("There should be only one APIResourceImport of GVR %s for location %s in logical cluster %s, but there was %d", gvr.String(), i.location, i.logicalClusterName, len(objs))
logger.Error(fmt.Errorf("failed to get APIResourceImport"), "There should be only one APIResourceImport", "gvr", gvr.String(), "location", i.location, "cluster", i.logicalClusterName, "obejcts", len(objs))
continue
}
if len(objs) == 1 {
apiResourceImport := objs[0].(*apiresourcev1alpha1.APIResourceImport).DeepCopy()
if err := apiResourceImport.Spec.SetSchema(crdVersion.Schema.OpenAPIV3Schema); err != nil {
klog.Errorf("Error setting schema: %v", err)
logger.Error(err, "Error setting schema")
continue
}
klog.Infof("Updating APIResourceImport %s|%s for SyncTarget %s", i.logicalClusterName, apiResourceImport.Name, i.location)
logger.Info("Updating APIResourceImport for SyncTarget", "cluster", i.logicalClusterName, "apiResourceImport", apiResourceImport.Name, "location", i.location)
if _, err := i.kcpClusterClient.Cluster(i.logicalClusterName).ApiresourceV1alpha1().APIResourceImports().Update(ctx, apiResourceImport, metav1.UpdateOptions{}); err != nil {
klog.Errorf("error updating APIResourceImport %s: %v", apiResourceImport.Name, err)
logger.Error(err, "error updating APIResourceImport", "apiResourceImport", apiResourceImport.Name)
continue
}
} else {
Expand All @@ -255,7 +250,7 @@ func (i *APIImporter) ImportAPIs(ctx context.Context) {
ObjectMeta: metav1.ObjectMeta{
Name: apiResourceImportName,
OwnerReferences: []metav1.OwnerReference{
clusterAsOwnerReference(cluster, true),
clusterAsOwnerReference(syncTarget, true),
},
Annotations: map[string]string{
logicalcluster.AnnotationKey: i.logicalClusterName.String(),
Expand All @@ -278,16 +273,16 @@ func (i *APIImporter) ImportAPIs(ctx context.Context) {
},
}
if err := apiResourceImport.Spec.SetSchema(crdVersion.Schema.OpenAPIV3Schema); err != nil {
klog.Errorf("Error setting schema: %v", err)
logger.Error(err, "Error setting schema")
continue
}
if value, found := pulledCrd.Annotations[apiextensionsv1.KubeAPIApprovedAnnotation]; found {
apiResourceImport.Annotations[apiextensionsv1.KubeAPIApprovedAnnotation] = value
}

klog.Infof("Creating APIResourceImport %s|%s", i.logicalClusterName, apiResourceImportName)
logger.Info("Creating APIResourceImport", "cluster", i.logicalClusterName, "apiResourceImport", apiResourceImportName)
if _, err := i.kcpClusterClient.Cluster(i.logicalClusterName).ApiresourceV1alpha1().APIResourceImports().Create(ctx, apiResourceImport, metav1.CreateOptions{}); err != nil {
klog.Errorf("error creating APIResourceImport %s: %v", apiResourceImport.Name, err)
logger.Error(err, "error creating APIResourceImport", "apiResourceImport", apiResourceImport.Name)
continue
}
}
Expand All @@ -302,19 +297,19 @@ func (i *APIImporter) ImportAPIs(ctx context.Context) {
clusterctl.GetGVRForLocationInLogicalClusterIndexKey(i.location, i.logicalClusterName, gvr),
)
if err != nil {
klog.Errorf("error pulling CRDs: %v", err)
logger.Error(err, "error pulling CRDs")
continue
}
if len(objs) > 1 {
klog.Errorf("There should be only one APIResourceImport of GVR %s for location %s in logical cluster %s, but there was %d", gvr.String(), i.location, i.logicalClusterName, len(objs))
logger.Error(fmt.Errorf("failed to get APIResourceImport"), "There should be only one APIResourceImport", "gvr", gvr.String(), "location", i.location, "cluster", i.logicalClusterName, "obejcts", len(objs))
continue
}
if len(objs) == 1 {
apiResourceImportToRemove := objs[0].(*apiresourcev1alpha1.APIResourceImport)
klog.Infof("Deleting APIResourceImport %s|%s", i.logicalClusterName, apiResourceImportToRemove.Name)
logger.Info("Deleting APIResourceImport", "cluster", i.logicalClusterName, "apiResourceImport", apiResourceImportToRemove.Name)
err := i.kcpClusterClient.Cluster(i.logicalClusterName).ApiresourceV1alpha1().APIResourceImports().Delete(ctx, apiResourceImportToRemove.Name, metav1.DeleteOptions{})
if err != nil {
klog.Errorf("error deleting APIResourceImport %s: %v", apiResourceImportToRemove.Name, err)
logger.Error(err, "error deleting APIResourceImport", "apiResourceImport", apiResourceImportToRemove.Name)
continue
}
}
Expand Down
Loading

0 comments on commit 75ca22e

Please sign in to comment.